Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshots overhaul #176

Merged
merged 53 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4b37854
Start on changing snapshot sizes
Shillaker Nov 18, 2021
ccfe652
Resizing
Shillaker Nov 18, 2021
5645712
Up to failing test
Shillaker Nov 18, 2021
86b7e2e
Tests for memory utils
Shillaker Nov 18, 2021
692e5d8
Fixing up tests
Shillaker Nov 18, 2021
808076c
Test for expanding and remapping snapshots
Shillaker Nov 18, 2021
ae6f3ac
Formatting
Shillaker Nov 18, 2021
d82ca4a
Checking for uninitialised fds
Shillaker Nov 18, 2021
3e10dbc
Test for pushing snapshots
Shillaker Nov 18, 2021
12f0f03
Merge branch 'master' into snap-sizes
Shillaker Dec 9, 2021
c311c82
Test fixes, formatting
Shillaker Dec 9, 2021
a050378
Add target to dev.sanitise
Shillaker Dec 9, 2021
09720bf
Half-way through refactor
Shillaker Dec 10, 2021
3c8273a
Small typos etc.
Shillaker Dec 10, 2021
f1ae79c
Continued refactor
Shillaker Dec 10, 2021
1f3fc82
Compiling
Shillaker Dec 10, 2021
2267dfd
Fixing up tests
Shillaker Dec 10, 2021
261a814
Fix tests
Shillaker Dec 10, 2021
6aaab19
Formatting
Shillaker Dec 10, 2021
4559377
Fixing up dist tests
Shillaker Dec 10, 2021
a0fdd8f
Fix failing asan test
Shillaker Dec 13, 2021
40256fd
Small tidy-up
Shillaker Dec 13, 2021
2669d3d
Typos
Shillaker Dec 13, 2021
bab12f1
PR comments WIP
Shillaker Dec 13, 2021
2535a3d
Remove vector/ span duplicates
Shillaker Dec 13, 2021
81f8c53
More tests
Shillaker Dec 14, 2021
a0b36a8
Adding private and shared snapshot mappings
Shillaker Dec 14, 2021
b542697
Compilation fixes
Shillaker Dec 14, 2021
3340e63
Sketching out memory view class
Shillaker Dec 14, 2021
d337f21
Follow through with MemoryView refactor
Shillaker Dec 14, 2021
f13b403
Fixing tests
Shillaker Dec 14, 2021
f1aff36
Fixing tests
Shillaker Dec 14, 2021
d9ce863
Tidying up
Shillaker Dec 14, 2021
b0a5ba8
Move merging logic into SnapshotData
Shillaker Dec 15, 2021
2e79ad3
Master writing back snapshot changes locally
Shillaker Dec 15, 2021
ce302a0
Failing distributed reduction test
Shillaker Dec 15, 2021
ea394a5
Formatting and tests
Shillaker Dec 15, 2021
bfc6aff
Adding repeats into dist test
Shillaker Dec 15, 2021
f6a49da
Fix up distributed test
Shillaker Dec 16, 2021
a2a0a0d
Tidy-up
Shillaker Dec 16, 2021
13c30c6
Override CPU count in dist tests
Shillaker Dec 16, 2021
d304a5c
Fix race condition in master snapshot
Shillaker Dec 17, 2021
d9ab090
Added locking log statement
Shillaker Dec 17, 2021
ec3fba6
Fix scheduler test
Shillaker Dec 17, 2021
195d386
Fix straggler error in tests
Shillaker Dec 17, 2021
2632dc9
Immutable snapshot diffs with ownership
Shillaker Dec 17, 2021
77ee625
Fix up broken tests
Shillaker Dec 17, 2021
0ae55a8
Fix compile error in dist tests
Shillaker Dec 17, 2021
ca0b823
Test fix-up
Shillaker Dec 17, 2021
9dc3902
Formatting
Shillaker Dec 17, 2021
0522c8e
Fix dist tests
Shillaker Dec 17, 2021
284889e
Rename _data param
Shillaker Dec 17, 2021
fff0f06
PR comments
Shillaker Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Executor

void releaseClaim();

virtual std::shared_ptr<faabric::util::MemoryView> getMemoryView();
virtual faabric::util::MemoryView getMemoryView();

protected:
virtual void restore(faabric::Message& msg);
Expand Down
22 changes: 10 additions & 12 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class SnapshotDiff
uint32_t offsetIn,
std::span<const uint8_t> dataIn);

SnapshotDataType getDataType() const;
SnapshotDataType getDataType() const { return dataType; }

SnapshotMergeOperation getOperation() const;
SnapshotMergeOperation getOperation() const { return operation; }

uint32_t getOffset() const;
uint32_t getOffset() const { return offset; }

std::span<const uint8_t> getData() const;
std::span<const uint8_t> getData() const { return data; }

std::vector<uint8_t> getDataCopy() const;

Expand Down Expand Up @@ -114,13 +114,13 @@ class SnapshotData

size_t getQueuedDiffsCount();

void queueDiffs(const std::vector<SnapshotDiff>& diffs);
void queueDiffs(std::span<SnapshotDiff> diffs);

void writeQueuedDiffs();

size_t getSize();
size_t getSize() const { return size; }

size_t getMaxSize();
size_t getMaxSize() const { return maxSize; }

private:
size_t size = 0;
Expand Down Expand Up @@ -150,18 +150,16 @@ class MemoryView
public:
// Note - this object is just a view of a section of memory, and does not
// own the underlying data
explicit MemoryView(std::span<const uint8_t> dataIn);

MemoryView(const MemoryView&) = delete;
MemoryView() = default;

MemoryView& operator=(const MemoryView&) = delete;
explicit MemoryView(std::span<const uint8_t> dataIn);

std::vector<SnapshotDiff> getDirtyRegions();

std::vector<SnapshotDiff> diffWithSnapshot(
std::shared_ptr<SnapshotData> snap);

std::span<const uint8_t> getData();
std::span<const uint8_t> getData() { return data; }

private:
std::span<const uint8_t> data;
Expand Down
11 changes: 6 additions & 5 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ void Executor::threadPoolThread(int threadPoolIdx)
oldTaskCount - 1);

// Handle snapshot diffs _before_ we reset the executor
std::shared_ptr<faabric::util::MemoryView> funcMemory = getMemoryView();
if (funcMemory != nullptr && isLastInBatch && task.needsSnapshotSync) {
faabric::util::MemoryView funcMemory = getMemoryView();
if (!funcMemory.getData().empty() && isLastInBatch &&
task.needsSnapshotSync) {
auto snap = faabric::snapshot::getSnapshotRegistry().getSnapshot(
msg.snapshotkey());

Expand All @@ -300,7 +301,7 @@ void Executor::threadPoolThread(int threadPoolIdx)
// If we're on master, we write the diffs straight to the snapshot
// otherwise we push them to the master.
std::vector<faabric::util::SnapshotDiff> diffs =
funcMemory->diffWithSnapshot(snap);
funcMemory.diffWithSnapshot(snap);

if (isMaster) {
SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} on "
Expand Down Expand Up @@ -424,11 +425,11 @@ void Executor::postFinish() {}

void Executor::reset(faabric::Message& msg) {}

std::shared_ptr<faabric::util::MemoryView> Executor::getMemoryView()
faabric::util::MemoryView Executor::getMemoryView()
{
SPDLOG_WARN("Executor for {} has not implemented memory view method",
faabric::util::funcToString(boundMessage, false));
return nullptr;
return faabric::util::MemoryView();
}

void Executor::restore(faabric::Message& msg)
Expand Down
4 changes: 2 additions & 2 deletions src/snapshot/SnapshotRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ void SnapshotRegistry::registerSnapshotIfNotExists(
const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data)
{
doRegisterSnapshot(key, data, false);
doRegisterSnapshot(key, std::move(data), false);
}

void SnapshotRegistry::registerSnapshot(
const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data)
{
doRegisterSnapshot(key, data, true);
doRegisterSnapshot(key, std::move(data), true);
}

void SnapshotRegistry::doRegisterSnapshot(
Expand Down
1 change: 1 addition & 0 deletions src/snapshot/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize)

// Convert chunks to snapshot diff objects
std::vector<SnapshotDiff> diffs;
diffs.reserve(r->chunks()->size());
for (const auto* chunk : *r->chunks()) {
diffs.emplace_back(
static_cast<SnapshotDataType>(chunk->dataType()),
Expand Down
49 changes: 6 additions & 43 deletions src/util/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,6 @@ SnapshotDiff::SnapshotDiff(SnapshotDataType dataTypeIn,
, data(dataIn.begin(), dataIn.end())
{}

SnapshotDataType SnapshotDiff::getDataType() const
{
return dataType;
}

SnapshotMergeOperation SnapshotDiff::getOperation() const
{
return operation;
}

uint32_t SnapshotDiff::getOffset() const
{
return offset;
}

std::span<const uint8_t> SnapshotDiff::getData() const
{
return data;
}

std::vector<uint8_t> SnapshotDiff::getDataCopy() const
{
return std::vector<uint8_t>(data.begin(), data.end());
Expand Down Expand Up @@ -238,7 +218,7 @@ size_t SnapshotData::getQueuedDiffsCount()
return queuedDiffs.size();
}

void SnapshotData::queueDiffs(const std::vector<SnapshotDiff>& diffs)
void SnapshotData::queueDiffs(const std::span<SnapshotDiff> diffs)
{
faabric::util::FullLock lock(snapMx);
for (const auto& diff : diffs) {
Expand Down Expand Up @@ -351,25 +331,14 @@ void SnapshotData::writeQueuedDiffs()
queuedDiffs.clear();
}

size_t SnapshotData::getSize()
{
return size;
}

size_t SnapshotData::getMaxSize()
{
return maxSize;
}

MemoryView::MemoryView(std::span<const uint8_t> dataIn)
: data(dataIn)
{}

std::vector<SnapshotDiff> MemoryView::getDirtyRegions()
{
if (data.empty()) {
std::vector<SnapshotDiff> empty;
return empty;
return {};
}

// Get dirty regions
Expand All @@ -382,13 +351,12 @@ std::vector<SnapshotDiff> MemoryView::getDirtyRegions()

// Convert to snapshot diffs
std::vector<SnapshotDiff> diffs;
for (auto& p : regions) {
size_t regionSize = p.second - p.first;
const uint8_t* regionData = data.data() + p.first;
diffs.reserve(regions.size());
for (auto [regionBegin, regionEnd] : regions) {
diffs.emplace_back(SnapshotDataType::Raw,
SnapshotMergeOperation::Overwrite,
p.first,
std::span<const uint8_t>(regionData, regionSize));
regionBegin,
data.subspan(regionBegin, regionEnd - regionBegin));
}

SPDLOG_DEBUG("Memory view has {}/{} dirty pages", diffs.size(), nPages);
Expand Down Expand Up @@ -441,11 +409,6 @@ std::vector<SnapshotDiff> MemoryView::diffWithSnapshot(
return diffs;
}

std::span<const uint8_t> MemoryView::getData()
{
return data;
}

std::string snapshotDataTypeStr(SnapshotDataType dt)
{
switch (dt) {
Expand Down
5 changes: 2 additions & 3 deletions tests/dist/DistTestExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ void DistTestExecutor::restore(faabric::Message& msg)
reg.mapSnapshot(msg.snapshotkey(), dummyMemory.get());
}

std::shared_ptr<faabric::util::MemoryView> DistTestExecutor::getMemoryView()
faabric::util::MemoryView DistTestExecutor::getMemoryView()
{
return std::make_shared<faabric::util::MemoryView>(
std::span<uint8_t>(dummyMemory.get(), dummyMemorySize));
return faabric::util::MemoryView({ dummyMemory.get(), dummyMemorySize });
}

std::span<uint8_t> DistTestExecutor::getDummyMemory()
Expand Down
2 changes: 1 addition & 1 deletion tests/dist/DistTestExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DistTestExecutor final : public faabric::scheduler::Executor

void restore(faabric::Message& msg) override;

std::shared_ptr<faabric::util::MemoryView> getMemoryView() override;
faabric::util::MemoryView getMemoryView() override;

std::span<uint8_t> getDummyMemory();

Expand Down
2 changes: 0 additions & 2 deletions tests/dist/scheduler/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec,
faabric::snapshot::getSnapshotRegistry();

auto originalSnap = reg.getSnapshot(snapshotKey);
std::shared_ptr<faabric::util::MemoryView> funcMemory =
exec->getMemoryView();

// Add a single merge region to catch both diffs
int offsetA = 10;
Expand Down
5 changes: 2 additions & 3 deletions tests/test/scheduler/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ void TestExecutor::restore(faabric::Message& msg)
reg.mapSnapshot(msg.snapshotkey(), dummyMemory.get());
}

std::shared_ptr<faabric::util::MemoryView> TestExecutor::getMemoryView()
faabric::util::MemoryView TestExecutor::getMemoryView()
{
return std::make_shared<faabric::util::MemoryView>(
std::span<uint8_t>(dummyMemory.get(), dummyMemorySize));
return faabric::util::MemoryView({ dummyMemory.get(), dummyMemorySize });
}

int32_t TestExecutor::executeTask(
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class TestExecutor final : public faabric::scheduler::Executor

void restore(faabric::Message& msg) override;

std::shared_ptr<faabric::util::MemoryView> getMemoryView() override;
faabric::util::MemoryView getMemoryView() override;

int32_t executeTask(
int threadPoolIdx,
Expand Down