Skip to content

Commit

Permalink
Merge pull request cloudflare#1354 from justin-mp/log-actor-on-error
Browse files Browse the repository at this point in the history
Log Actor ID on V8 deserialization errors
  • Loading branch information
justin-mp authored Oct 27, 2023
2 parents f5b7120 + 2bdbe17 commit 7448549
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 57 deletions.
9 changes: 9 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,20 @@ wd_cc_capnp_library(
["**/*-test.c++"],
exclude = [
"api-rtti-test.c++",
"actor-state-iocontext-test.c++",
"cf-property-test.c++",
"node/*-test.c++",
],
)]

kj_test(
src = "actor-state-iocontext-test.c++",
deps = [
"//src/workerd/io",
"//src/workerd/tests:test-fixture",
]
)

kj_test(
src = "node/buffer-test.c++",
deps = ["//src/workerd/tests:test-fixture"],
Expand Down
96 changes: 96 additions & 0 deletions src/workerd/api/actor-state-iocontext-test.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2017-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#include <algorithm>

#include <kj/test.h>
#include <kj/encoding.h>

#include <workerd/api/actor-state.h>
#include <workerd/tests/test-fixture.h>

namespace workerd::api {
namespace {

using workerd::TestFixture;

bool contains(kj::StringPtr haystack, kj::StringPtr needle) {
return std::search(haystack.begin(), haystack.end(), needle.begin(), needle.end())
!= haystack.end();
}

class MockActorId : public ActorIdFactory::ActorId {
public:
MockActorId(kj::String id) : id(kj::mv(id)) {}
kj::String toString() const override {
return kj::str("MockActorId<", id, ">");
}

kj::Maybe<kj::StringPtr> getName() const override {
return kj::none;
}

bool equals(const ActorId& other) const override {
return false;
}

kj::Own<ActorId> clone() const override {
return kj::heap<MockActorId>(kj::heapString(id));
}

virtual ~MockActorId() {};
private:
kj::String id;
};

void runBadDeserialization(jsg::Lock& lock, kj::StringPtr expectedId) {
// FF = kVersion token, 0E = version 15, 06 = an unknown tag value
kj::StringPtr invalidV8Hex = "FF0E06"_kj;
auto invalidV8Value = kj::decodeHex(invalidV8Hex.asArray());
try {
deserializeV8Value(lock, "some-key"_kj, invalidV8Value);
KJ_FAIL_ASSERT("deserializeV8Value should have failed.");
} catch (kj::Exception& ex) {
if (ex.getDescription().startsWith("actor storage deserialization failed")) {
KJ_ASSERT(contains(ex.getDescription(), expectedId));
} else {
throw;
}
}
}

void runBadDeserializationInIoContext(TestFixture& fixture, kj::StringPtr expectedId) {
fixture.runInIoContext(
[expectedId](const workerd::TestFixture::Environment& env) -> kj::Promise<void> {
runBadDeserialization(env.lock, expectedId);
return kj::READY_NOW;
});
}

// TODO(maybe) It would be nice to have a test that tests the case when there's no IoContext,
// but that's a royal pain to set up in this test file we'd basically only test that we don't
// crash, which the actor-state-test.c++ does for us.

KJ_TEST("no actor specified") {
TestFixture fixture;
runBadDeserializationInIoContext(fixture, "actorId = ;"_kj);
}

KJ_TEST("actor specified with string id") {
Worker::Actor::Id id = kj::str("testActorId");
TestFixture fixture(TestFixture::SetupParams{.actorId = kj::mv(id)});
runBadDeserializationInIoContext(fixture, "actorId = testActorId;"_kj);
}

KJ_TEST("actor specified with ActorId object") {
kj::Own<ActorIdFactory::ActorId> mockActorId = kj::heap<MockActorId>(kj::str("testActorId"));
Worker::Actor::Id id = kj::mv(mockActorId);
TestFixture fixture(TestFixture::SetupParams{
.actorId = kj::mv(id),
});
runBadDeserializationInIoContext(fixture, "actorId = MockActorId<testActorId>;"_kj);
}

} // namespace
} // namespace workerd::api
22 changes: 21 additions & 1 deletion src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,25 @@ kj::Promise<void> updateStorageDeletes(IoContext& context,
metrics.addStorageDeletes(deleted);
};

// Return the id of the current actor (or the empty string if there is no current actor).
kj::Maybe<kj::String> getCurrentActorId() {
if (IoContext::hasCurrent()) {
IoContext& ioContext = IoContext::current();
KJ_IF_SOME(actor, ioContext.getActor()) {
KJ_SWITCH_ONEOF(actor.getId()) {
KJ_CASE_ONEOF(s, kj::String) {
return kj::heapString(s);
}
KJ_CASE_ONEOF(actorId, kj::Own<ActorIdFactory::ActorId>) {
return actorId->toString();
}
}
KJ_UNREACHABLE;
}
}
return kj::none;
}

} // namespace

jsg::Promise<jsg::JsRef<jsg::JsValue>> DurableObjectStorageOperations::get(
Expand Down Expand Up @@ -959,9 +978,10 @@ jsg::JsValue deserializeV8Value(jsg::Lock& js,
// include the key (to help find the data in the database if it hasn't been deleted), the
// length of the value, and the first three bytes of the value (which is just the v8-internal
// version header and the tag that indicates the type of the value, but not its contents).
kj::String actorId = getCurrentActorId().orDefault([]() { return kj::str(); });
KJ_FAIL_ASSERT("actor storage deserialization failed",
"failed to deserialize stored value",
exception.getHandle(js), key, buf.size(),
actorId, exception.getHandle(js), key, buf.size(),
buf.slice(0, std::min(static_cast<size_t>(3), buf.size())));
});
} catch (jsg::JsExceptionThrown&) {
Expand Down
47 changes: 0 additions & 47 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -115,53 +115,6 @@ static kj::Vector<char> escapeJsonString(kj::StringPtr text) {
return escaped;
}

// An ActorStorage implementation which will always respond to reads as if the state is empty,
// and will fail any writes.
class EmptyReadOnlyActorStorageImpl final: public rpc::ActorStorage::Stage::Server {
public:
kj::Promise<void> get(GetContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> getMultiple(GetMultipleContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> list(ListContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> getAlarm(GetAlarmContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> txn(TxnContext context) override {
auto results = context.getResults(capnp::MessageSize {2, 1});
results.setTransaction(kj::heap<TransactionImpl>());
return kj::READY_NOW;
}

private:
class TransactionImpl final: public rpc::ActorStorage::Stage::Transaction::Server {
protected:
kj::Promise<void> get(GetContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> getMultiple(GetMultipleContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> list(ListContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> getAlarm(GetAlarmContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> commit(CommitContext context) override {
return kj::READY_NOW;
}
};
};

} // namespace

// =======================================================================================
Expand Down
47 changes: 47 additions & 0 deletions src/workerd/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,51 @@ class Server: private kj::TaskSet::ErrorHandler {
kj::ForkedPromise<void>& forkedDrainWhen);
};

// An ActorStorage implementation which will always respond to reads as if the state is empty,
// and will fail any writes.
class EmptyReadOnlyActorStorageImpl final: public rpc::ActorStorage::Stage::Server {
public:
kj::Promise<void> get(GetContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> getMultiple(GetMultipleContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> list(ListContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> getAlarm(GetAlarmContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> txn(TxnContext context) override {
auto results = context.getResults(capnp::MessageSize {2, 1});
results.setTransaction(kj::heap<TransactionImpl>());
return kj::READY_NOW;
}

private:
class TransactionImpl final: public rpc::ActorStorage::Stage::Transaction::Server {
protected:
kj::Promise<void> get(GetContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> getMultiple(GetMultipleContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> list(ListContext context) override {
return context.getParams().getStream().endRequest(capnp::MessageSize {2, 0})
.send().ignoreResult();
}
kj::Promise<void> getAlarm(GetAlarmContext context) override {
return kj::READY_NOW;
}
kj::Promise<void> commit(CommitContext context) override {
return kj::READY_NOW;
}
};
};

} // namespace workerd::server
2 changes: 1 addition & 1 deletion src/workerd/tests/bench-global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct GlobalScopeBenchmark: public benchmark::Fixture {
},
};
)"_kj};
fixture = kj::heap<TestFixture>(params);
fixture = kj::heap<TestFixture>(kj::mv(params));
}

void TearDown(benchmark::State& state) noexcept(true) override {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/bench-regex.c++
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct RegExpBenchmark: public benchmark::Fixture {
}
)"_kj
};
fixture = kj::heap<TestFixture>(params);
fixture = kj::heap<TestFixture>(kj::mv(params));
}

void TearDown(benchmark::State& state) noexcept(true) override {
Expand Down
41 changes: 37 additions & 4 deletions src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

#include <algorithm>

#include <workerd/api/actor-state.h>
#include <workerd/api/global-scope.h>
#include <workerd/io/actor-cache.h>
#include <workerd/io/io-channels.h>
#include <workerd/io/limit-enforcer.h>
#include <workerd/io/observer.h>
#include <workerd/io/worker-entrypoint.h>
#include <workerd/jsg/modules.h>
#include <workerd/server/server.h>
#include <workerd/server/workerd-api.h>
#include <workerd/util/stream-utils.h>

Expand Down Expand Up @@ -230,11 +232,23 @@ struct MockResponse final: public kj::HttpService::Response {
KJ_FAIL_REQUIRE("NOT SUPPORTED");
}
};

class MockActorLoopback : public Worker::Actor::Loopback, public kj::Refcounted {
public:
virtual kj::Own<WorkerInterface> getWorker(IoChannelFactory::SubrequestMetadata metadata) {
return kj::Own<WorkerInterface>();
};

virtual kj::Own<Worker::Actor::Loopback> addRef() {
return kj::addRef(*this);
};
};

} // namespace


TestFixture::TestFixture(SetupParams params)
: params(params),
TestFixture::TestFixture(SetupParams&& params)
: waitScope(params.waitScope),
config(buildConfig(params, configArena)),
io(params.waitScope == kj::none ? kj::Maybe(kj::setupAsyncIo()) : kj::Maybe<kj::AsyncIoContext>(kj::none)),
timer(kj::heap<MockTimer>()),
Expand Down Expand Up @@ -275,7 +289,26 @@ TestFixture::TestFixture(SetupParams params)
)),
errorHandler(kj::heap<DummyErrorHandler>()),
waitUntilTasks(*errorHandler),
headerTable(headerTableBuilder.build()) { }
headerTable(headerTableBuilder.build()) {
KJ_IF_SOME(id, params.actorId) {
auto lock = Worker::Lock(*worker, Worker::Lock::TakeSynchronously(kj::none));
auto makeActorCache = [](
const ActorCache::SharedLru& sharedLru, OutputGate& outputGate, ActorCache::Hooks& hooks) {
return kj::heap<ActorCache>(
kj::heap<server::EmptyReadOnlyActorStorageImpl>(), sharedLru, outputGate, hooks);
};
auto makeStorage = [](
jsg::Lock& js, const Worker::ApiIsolate& apiIsolate, ActorCacheInterface& actorCache)
-> jsg::Ref<api::DurableObjectStorage> {
return jsg::alloc<api::DurableObjectStorage>(
IoContext::current().addObject(actorCache));
};
actor = kj::refcounted<Worker::Actor>(
*worker, /*tracker=*/kj::none, kj::mv(id), /*hasTransient=*/false, makeActorCache,
/*classname=*/kj::none, makeStorage, lock, kj::refcounted<MockActorLoopback>(),
*timerChannel, kj::refcounted<ActorObserver>(), kj::none, kj::none);
}
}

void TestFixture::runInIoContext(
kj::Function<kj::Promise<void>(const Environment&)>&& callback,
Expand Down Expand Up @@ -310,7 +343,7 @@ void TestFixture::runInIoContext(

kj::Own<IoContext::IncomingRequest> TestFixture::createIncomingRequest() {
auto context = kj::refcounted<IoContext>(
threadContext, kj::atomicAddRef(*worker), nullptr, kj::heap<MockLimitEnforcer>());
threadContext, kj::atomicAddRef(*worker), actor, kj::heap<MockLimitEnforcer>());
auto incomingRequest = kj::heap<IoContext::IncomingRequest>(
kj::addRef(*context), kj::heap<DummyIoChannelFactory>(*timerChannel),
kj::refcounted<RequestObserver>(), nullptr);
Expand Down
9 changes: 6 additions & 3 deletions src/workerd/tests/test-fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ struct TestFixture {
kj::Maybe<kj::WaitScope&> waitScope;
kj::Maybe<CompatibilityFlags::Reader> featureFlags;
kj::Maybe<kj::StringPtr> mainModuleSource;
// If set, make a stub of an Actor with the given id.
kj::Maybe<Worker::Actor::Id> actorId;
};

TestFixture(SetupParams params = { });
TestFixture(SetupParams&& params = { });

struct V8Environment {
v8::Isolate* isolate;
Expand All @@ -50,7 +52,7 @@ struct TestFixture {
-> typename RunReturnType<decltype(callback(kj::instance<const Environment&>()))>::Type {
auto request = createIncomingRequest();
kj::WaitScope* waitScope;
KJ_IF_SOME(ws, params.waitScope) {
KJ_IF_SOME(ws, this->waitScope) {
waitScope = &ws;
} else {
waitScope = &KJ_REQUIRE_NONNULL(io).waitScope;
Expand Down Expand Up @@ -80,14 +82,15 @@ struct TestFixture {
Response runRequest(kj::HttpMethod method, kj::StringPtr url, kj::StringPtr body);

private:
SetupParams params;
kj::Maybe<kj::WaitScope&> waitScope;
capnp::MallocMessageBuilder configArena;
workerd::server::config::Worker::Reader config;
kj::Maybe<kj::AsyncIoContext> io;
capnp::MallocMessageBuilder workerBundleArena;
kj::Own<kj::Timer> timer;
kj::Own<TimerChannel> timerChannel;
kj::Own<kj::EntropySource> entropySource;
kj::Maybe<kj::Own<Worker::Actor>> actor;
capnp::ByteStreamFactory byteStreamFactory;
kj::HttpHeaderTable::Builder headerTableBuilder;
ThreadContext::HeaderIdBundle threadContextHeaderBundle;
Expand Down

0 comments on commit 7448549

Please sign in to comment.