Skip to content

Commit

Permalink
Merge branch 'main' into fix_3280
Browse files Browse the repository at this point in the history
  • Loading branch information
adiholden authored Jul 8, 2024
2 parents f29b020 + fba902d commit 44160ae
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 118 deletions.
73 changes: 73 additions & 0 deletions .github/workflows/scorecard.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# This workflow uses actions that are not certified by GitHub. They are provided
# by a third-party and are governed by separate terms of service, privacy
# policy, and support documentation.

name: Scorecard supply-chain security
on:
# For Branch-Protection check. Only the default branch is supported. See
# https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection
branch_protection_rule:
# To guarantee Maintained check is occasionally updated. See
# https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained
schedule:
- cron: '43 4 * * 1'
push:
branches: [ "main" ]

# Declare default permissions as read only.
permissions: read-all

jobs:
analysis:
name: Scorecard analysis
runs-on: ubuntu-latest
permissions:
# Needed to upload the results to code-scanning dashboard.
security-events: write
# Needed to publish results and get a badge (see publish_results below).
id-token: write
# Uncomment the permissions below if installing in a private repository.
# contents: read
# actions: read

steps:
- name: "Checkout code"
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
with:
persist-credentials: false

- name: "Run analysis"
uses: ossf/scorecard-action@0864cf19026789058feabb7e87baa5f140aac736 # v2.3.1
with:
results_file: results.sarif
results_format: sarif
# (Optional) "write" PAT token. Uncomment the `repo_token` line below if:
# - you want to enable the Branch-Protection check on a *public* repository, or
# - you are installing Scorecard on a *private* repository
# To create the PAT, follow the steps in https://github.com/ossf/scorecard-action?tab=readme-ov-file#authentication-with-fine-grained-pat-optional.
# repo_token: ${{ secrets.SCORECARD_TOKEN }}

# Public repositories:
# - Publish results to OpenSSF REST API for easy access by consumers
# - Allows the repository to include the Scorecard badge.
# - See https://github.com/ossf/scorecard-action#publishing-results.
# For private repositories:
# - `publish_results` will always be set to `false`, regardless
# of the value entered here.
publish_results: true

# Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF
# format to the repository Actions tab.
- name: "Upload artifact"
uses: actions/upload-artifact@97a0fba1372883ab732affbe8f94b823f91727db # v3.pre.node20
with:
name: SARIF file
path: results.sarif
retention-days: 5

# Upload the results to GitHub's code scanning dashboard (optional).
# Commenting out will disable upload of results to your repo's Code Scanning dashboard
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@1b1aada464948af03b950897e5eb522f92603cc2 # v3.24.9
with:
sarif_file: results.sarif
15 changes: 8 additions & 7 deletions LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ derivative works, redistribute, and make non-production use of the
Licensed Work. The Licensor may make an Additional Use Grant, above,
permitting limited production use.

Effective on the Change Date, or the forth anniversary of the first
Effective on the Change Date, or the fourth anniversary of the first
publicly available distribution of a specific version of the Licensed
Work under this License, whichever comes first, the Licensor hereby
grants you rights under the terms of the Change License, and the rights
Expand Down Expand Up @@ -50,9 +50,10 @@ and all other versions of the Licensed Work.

This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or
logo of Licensor as expressly required by this License).TO THE EXTENT
PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON AN “AS IS”
BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, EXPRESS
OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
logo of Licensor as expressly required by this License).

TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED
ON AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND
CONDITIONS, EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION)
WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
NON-INFRINGEMENT, AND TITLE.
39 changes: 20 additions & 19 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,10 @@ void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) {
}

void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
// Async cleanup can only be performed if no tiered entries exist
bool async_cleanup = true;
for (DbIndex index : indexes) {
async_cleanup &= db_arr_[index]->stats.tiered_entries == 0;
}
bool clear_tiered = owner_->tiered_storage() != nullptr;

if (!async_cleanup)
ClearEntriesOnFlush(indexes, db_arr_, false);
if (clear_tiered)
ClearOffloadedEntries(indexes, db_arr_);

DbTableArray flush_db_arr(db_arr_.size());
for (DbIndex index : indexes) {
Expand All @@ -729,9 +725,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

CHECK(fetched_items_.empty());
auto cb = [this, async_cleanup, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
if (async_cleanup)
ClearEntriesOnFlush(indexes, flush_db_arr, true);
auto cb = [this, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
ServerState::kGlibcmalloc);
Expand Down Expand Up @@ -1408,24 +1402,31 @@ void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) {
}
}

void DbSlice::ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
bool async) {
for (auto index : indices) {
void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr) {
// Currently being used only for tiered storage.
TieredStorage* tiered_storage = shard_owner()->tiered_storage();
string scratch;
for (DbIndex index : indices) {
const auto& db_ptr = db_arr[index];
if (!db_ptr || db_ptr->stats.tiered_entries == 0)
if (!db_ptr)
continue;

// Delete all tiered entries
PrimeTable::Cursor cursor;
do {
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal())
PerformDeletion(it, db_ptr.get());
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasIoPending()) {
tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second);
}
});
} while (cursor && db_ptr->stats.tiered_entries > 0);
} while (cursor);

// Wait for delete operations to finish in sync
while (!async && db_ptr->stats.tiered_entries > 0) {
// Wait for delete operations to finish in sync.
// TODO: the logic inside tiered_storage that updates tiered_entries is somewhat fragile.
// To revisit it, otherwise we may have deadlocks around this code.
while (db_ptr->stats.tiered_entries > 0) {
LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush";
ThisFiber::SleepFor(1ms);
}
Expand Down
6 changes: 2 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,8 @@ class DbSlice {
// Invalidate all watched keys for given slots. Used on FlushSlots.
void InvalidateSlotWatches(const cluster::SlotSet& slot_ids);

// Properly clear db_arr before deleting it. If async is set, it's called from a detached fiber
// after swapping the db.
void ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
bool async);
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);

void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table);

Expand Down
7 changes: 7 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ TEST_F(DflyEngineTest, MemcacheFlags) {
ASSERT_EQ(resp, "OK");
MCResponse resp2 = RunMC(MP::GET, "key");
EXPECT_THAT(resp2, ElementsAre("VALUE key 42 3", "bar", "END"));

ASSERT_EQ(Run("resp", {"flushdb"}), "OK");
pp_->AwaitFiberOnAll([](auto*) {
if (auto* shard = EngineShard::tlocal(); shard) {
EXPECT_EQ(shard->db_slice().GetDBTable(0)->mcflag.size(), 0u);
}
});
}

TEST_F(DflyEngineTest, LimitMemory) {
Expand Down
121 changes: 64 additions & 57 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched);
}

// Called before overriding value with segment
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
stats->tiered_entries++;
stats->tiered_used_bytes += segment.length;
}

// Called after setting new value in place of previous segment
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed());
stats->tiered_entries--;
stats->tiered_used_bytes -= segment.length;
}

// Find entry by key in db_slice and store external segment in place of original value.
// Update memory stats
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment);
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment.length);

pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length);
Expand Down Expand Up @@ -113,9 +99,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (!value.empty())
pv->SetString(value);

RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment);

(value.empty() ? stats_.total_deletes : stats_.total_fetches)++;
RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment.length);
}

// Find entry by key and store it's up-to-date value in place of external segment.
Expand All @@ -129,25 +113,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

// Load all values from bin by their hashes
void Defragment(tiering::DiskSegment segment, string_view value) {
// Note: Bin could've already been deleted, in that case DeleteBin returns an empty list
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
// Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin
auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

stats_.total_defrags++;

// Cut out relevant part of value and restore it to memory
string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length);
SetInMemory(&it->second, dbid, sub_value, sub_segment);
}
}
void Defragment(tiering::DiskSegment segment, string_view value);

void ReportStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override {
if (ec) {
Expand All @@ -159,21 +125,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
return true; // delete
}

if (!modified && !cache_fetched_)
return false;

if (SliceSnapshot::IsSnaphotInProgress())
return false;

SetInMemory(get<OpManager::KeyRef>(id), value, segment);
return true;
}
bool modified) override;

bool ReportDelete(tiering::DiskSegment segment) override {
if (OccupiesWholePages(segment.length))
Expand Down Expand Up @@ -203,17 +155,70 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return IsValid(it) ? &it->second : nullptr;
}

// Called before overriding value with segment
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, size_t tiered_len) {
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
stats->tiered_entries++;
stats->tiered_used_bytes += tiered_len;
}

// Called after setting new value in place of previous segment
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, size_t tiered_len) {
stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed());
stats->tiered_entries--;
stats->tiered_used_bytes -= tiered_len;
}

bool cache_fetched_ = false;

struct {
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0;
size_t total_defrags = 0; // included in total_fetches
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
size_t total_defrags = 0;
} stats_;

TieredStorage* ts_;
DbSlice* db_slice_;
};

void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view value) {
// Note: Bin could've already been deleted, in that case DeleteBin returns an empty list
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
// Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin
auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

stats_.total_defrags++;

// Cut out relevant part of value and restore it to memory
string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length);
SetInMemory(&it->second, dbid, sub_value, sub_segment);
}
}

bool TieredStorage::ShardOpManager::ReportFetched(EntryId id, string_view value,
tiering::DiskSegment segment, bool modified) {
++stats_.total_fetches;

if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
return true; // delete
}

if (!modified && !cache_fetched_)
return false;

if (SliceSnapshot::IsSnaphotInProgress())
return false;

SetInMemory(get<OpManager::KeyRef>(id), value, segment);
return true;
}

TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_size)
: op_manager_{make_unique<ShardOpManager>(this, db_slice, max_size)},
bins_{make_unique<tiering::SmallBins>()} {
Expand Down Expand Up @@ -276,7 +281,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
return false;

// This invariant should always hold because ShouldStash tests for IoPending flag.
CHECK(!bins_->IsPending(dbid, key));
DCHECK(!bins_->IsPending(dbid, key));

// TODO: When we are low on memory we should introduce a back-pressure, to avoid OOMs
// with a lot of underutilized disk space.
Expand Down Expand Up @@ -310,9 +315,11 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {

void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
DCHECK(value->IsExternal());
++stats_.total_deletes;

tiering::DiskSegment segment = value->GetExternalSlice();
op_manager_->Delete(segment);
op_manager_->SetInMemory(value, dbid, "", segment);
op_manager_->DeleteOffloaded(segment);
op_manager_->SetInMemory(value, dbid, string_view{}, segment);
}

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
Expand Down
1 change: 1 addition & 0 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TieredStorage {
unsigned write_depth_limit_ = 10;
struct {
uint64_t stash_overflow_cnt = 0;
uint64_t total_deletes = 0;
} stats_;
};

Expand Down
Loading

0 comments on commit 44160ae

Please sign in to comment.