Skip to content

Commit

Permalink
tsgrpc_kvstore: Fix missing kvstore::List strip_prefix_length and sta…
Browse files Browse the repository at this point in the history
…leness_bound

Improve test by only starting external tsgrpc server once.
Add tsgrpc_kvstore verbose logging

PiperOrigin-RevId: 616950380
Change-Id: I423771c19795276595462b26bb6d94adcf56b61f
  • Loading branch information
laramiel authored and copybara-github committed Mar 18, 2024
1 parent aab2df4 commit 652f20f
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ startup --windows_enable_symlinks
# Windows has path-length limits of 260 characters leading to errors which
# may require a shorter filenames. If encountered, uncomment and adjust:
#
# startup --output_base=C:\Out
# startup --output_base=C:\\Out

# By default assume MSVC on Windows.
build:windows --config=msvc
Expand Down
9 changes: 9 additions & 0 deletions tensorstore/kvstore/memory/memory_key_value_store_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ TEST(MemoryKeyValueStoreTest, Open) {
}
}

TEST(MemoryKeyValueStoreTest, ListWithPath) {
auto context = Context::Default();
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store,
kvstore::Open({{"driver", "memory"}, {"path", "p/"}}, context).result());

tensorstore::internal::TestKeyValueStoreList(store);
}

TEST(MemoryKeyValueStoreTest, SpecRoundtrip) {
tensorstore::internal::KeyValueStoreSpecRoundtripOptions options;
options.full_spec = {
Expand Down
11 changes: 8 additions & 3 deletions tensorstore/kvstore/tsgrpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ tensorstore_cc_library(
"//tensorstore/internal/json_binding",
"//tensorstore/internal/json_binding:absl_time",
"//tensorstore/internal/json_binding:bindable",
"//tensorstore/internal/log:verbose_flag",
"//tensorstore/internal/metrics",
"//tensorstore/kvstore",
"//tensorstore/kvstore:byte_range",
"//tensorstore/kvstore:generation",
"//tensorstore/kvstore:key_range",
"//tensorstore/proto:encode_time",
"//tensorstore/proto:proto_util",
"//tensorstore/serialization:absl_time",
"//tensorstore/util:executor",
"//tensorstore/util:future",
Expand All @@ -111,6 +113,7 @@ tensorstore_cc_library(
"//tensorstore/util/garbage_collection",
"@com_github_grpc_grpc//:grpc++",
"@com_github_grpc_grpc//:grpc++_public_hdrs",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:cord",
Expand All @@ -135,8 +138,6 @@ tensorstore_cc_test(
"//tensorstore/kvstore:test_util",
"//tensorstore/proto:parse_text_proto_or_die",
"//tensorstore/proto:protobuf_matchers",
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"//tensorstore/util/execution",
"//tensorstore/util/execution:sender_testutil",
Expand Down Expand Up @@ -164,12 +165,14 @@ tensorstore_cc_library(
"//tensorstore/internal/grpc:server_credentials",
"//tensorstore/internal/json_binding",
"//tensorstore/internal/json_binding:bindable",
"//tensorstore/internal/log:verbose_flag",
"//tensorstore/internal/metrics",
"//tensorstore/kvstore",
"//tensorstore/kvstore:byte_range",
"//tensorstore/kvstore:generation",
"//tensorstore/kvstore:key_range",
"//tensorstore/proto:encode_time",
"//tensorstore/proto:proto_util",
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:span",
Expand All @@ -178,6 +181,7 @@ tensorstore_cc_library(
"@com_github_grpc_grpc//:grpc++",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/synchronization",
Expand All @@ -191,6 +195,7 @@ tensorstore_cc_test(
":kvstore_server",
":tsgrpc",
"//tensorstore:context",
"//tensorstore/internal/http:transport_test_utils",
"//tensorstore/kvstore",
"//tensorstore/kvstore:key_range",
"//tensorstore/kvstore:test_matchers",
Expand All @@ -199,12 +204,12 @@ tensorstore_cc_test(
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"//tensorstore/util:str_cat",
"//tensorstore/util/execution",
"//tensorstore/util/execution:sender_testutil",
"@com_github_grpc_grpc//:grpc++",
"@com_github_nlohmann_json//:nlohmann_json",
"@com_google_absl//absl/strings:cord",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
"@com_google_googletest//:gtest_main",
],
Expand Down
5 changes: 5 additions & 0 deletions tensorstore/kvstore/tsgrpc/kvstore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,12 @@ message DeleteResponse {
/// kvstore::ListOptions
message ListRequest {
KeyRange range = 1;

/// Length of prefix to strip from keys.
uint64 strip_prefix_length = 2;

/// Staleness bound on list results.
google.protobuf.Timestamp staleness_bound = 3;
}

message ListResponse {
Expand Down
20 changes: 20 additions & 0 deletions tensorstore/kvstore/tsgrpc/kvstore_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include <utility>
#include <vector>

#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/absl_log.h"
#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/synchronization/mutex.h"
Expand All @@ -42,6 +44,7 @@
#include "tensorstore/internal/json_binding/bindable.h"
#include "tensorstore/internal/json_binding/json_binding.h"
#include "tensorstore/internal/json_binding/std_array.h"
#include "tensorstore/internal/log/verbose_flag.h"
#include "tensorstore/internal/metrics/counter.h"
#include "tensorstore/internal/path.h"
#include "tensorstore/kvstore/byte_range.h"
Expand All @@ -54,6 +57,7 @@
#include "tensorstore/kvstore/tsgrpc/common.pb.h"
#include "tensorstore/kvstore/tsgrpc/handler_template.h"
#include "tensorstore/proto/encode_time.h"
#include "tensorstore/proto/proto_util.h"
#include "tensorstore/util/execution/any_receiver.h"
#include "tensorstore/util/execution/execution.h"
#include "tensorstore/util/future.h"
Expand Down Expand Up @@ -96,6 +100,8 @@ auto& delete_metric = internal_metrics::Counter<int64_t>::New(
auto& list_metric = internal_metrics::Counter<int64_t>::New(
"/tensorstore/kvstore/grpc_server/list", "KvStoreService::List calls");

ABSL_CONST_INIT internal_log::VerboseFlag verbose_logging("tsgrpc_kvstore");

class ReadHandler final : public Handler<ReadRequest, ReadResponse> {
using Base = Handler<ReadRequest, ReadResponse>;

Expand All @@ -105,6 +111,8 @@ class ReadHandler final : public Handler<ReadRequest, ReadResponse> {
: Base(grpc_context, request, response), kvstore_(std::move(kvstore)) {}

void Run() {
ABSL_LOG_IF(INFO, verbose_logging)
<< "ReadHandler " << ConciseDebugString(*request());
kvstore::ReadOptions options{};
options.if_equal.value = request()->generation_if_equal();
options.if_not_equal.value = request()->generation_if_not_equal();
Expand Down Expand Up @@ -171,6 +179,8 @@ class WriteHandler final : public Handler<WriteRequest, WriteResponse> {
: Base(grpc_context, request, response), kvstore_(std::move(kvstore)) {}

void Run() {
ABSL_LOG_IF(INFO, verbose_logging)
<< "WriteHandler " << ConciseDebugString(*request());
tensorstore::kvstore::WriteOptions options{};
options.if_equal.value = request()->generation_if_equal();

Expand Down Expand Up @@ -216,6 +226,8 @@ class DeleteHandler final : public Handler<DeleteRequest, DeleteResponse> {
: Base(grpc_context, request, response), kvstore_(std::move(kvstore)) {}

void Run() {
ABSL_LOG_IF(INFO, verbose_logging)
<< "DeleteHandler " << ConciseDebugString(*request());
internal::IntrusivePtr<DeleteHandler> self{this};
auto callback = [self = std::move(self)](Promise<void> promise,
auto del_result) {
Expand Down Expand Up @@ -396,10 +408,18 @@ class ListHandler final : public StreamHandler<ListRequest, ListResponse> {
};

void ListHandler::Run() {
ABSL_LOG_IF(INFO, verbose_logging)
<< "ListHandler " << ConciseDebugString(*request());

tensorstore::kvstore::ListOptions options;
options.range = tensorstore::KeyRange(request()->range().inclusive_min(),
request()->range().exclusive_max());
options.strip_prefix_length = request()->strip_prefix_length();
if (request()->has_staleness_bound()) {
TENSORSTORE_ASSIGN_OR_RETURN(
options.staleness_bound,
internal::ProtoToAbslTime(request()->staleness_bound()), Finish(_));
}

internal::IntrusivePtr<ListHandler> self{this};
tensorstore::execution::submit(
Expand Down
84 changes: 54 additions & 30 deletions tensorstore/kvstore/tsgrpc/kvstore_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/cord.h"
#include "absl/strings/str_format.h"
#include "absl/synchronization/notification.h"
#include <nlohmann/json.hpp>
#include "tensorstore/context.h"
#include "tensorstore/kvstore/driver.h"
#include "tensorstore/kvstore/key_range.h"
#include "tensorstore/kvstore/kvstore.h"
#include "tensorstore/kvstore/operations.h"
Expand All @@ -37,7 +37,6 @@
#include "tensorstore/util/future.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/status_testutil.h"
#include "tensorstore/util/str_cat.h"

namespace {

Expand All @@ -46,41 +45,60 @@ using ::tensorstore::KeyRange;
using ::tensorstore::grpc_kvstore::KvStoreServer;
using ::tensorstore::internal::MatchesKvsReadResultNotFound;

// This test uses the memory kvstore driver over grpc, so the test
// assertions should closely mimic those in the memory kvstore driver.
class KvStoreTest : public testing::Test {
class KvStoreSingleton {
public:
KvStoreTest()
: ctx_(tensorstore::Context::Default()),
server_(
KvStoreServer::Start(KvStoreServer::Spec::FromJson( //
{
{"bind_addresses", {"localhost:0"}},
{"base", "memory://prefix"},
})
.value(),
ctx_)
.value()) {}

tensorstore::KvStore OpenStore() {
auto address = tensorstore::StrCat("localhost:", server_.port());
return tensorstore::kvstore::Open(
{{"driver", "tsgrpc_kvstore"}, {"address", address}}, ctx_)
.value();
KvStoreSingleton() : ctx_(tensorstore::Context::Default()) {
server_ = KvStoreServer::Start(KvStoreServer::Spec::FromJson( //
{
{"bind_addresses", {"localhost:0"}},
{"base", "memory://x"},
})
.value(),
ctx_)
.value();
address_ = absl::StrFormat("localhost:%d", server_.port());
}

const std::string& address() const { return address_; }

private:
tensorstore::Context ctx_;
KvStoreServer server_;
std::string address_;
};

const KvStoreSingleton& GetSingleton() {
static const KvStoreSingleton* const kSingleton = new KvStoreSingleton();
return *kSingleton;
}

// This test uses the memory kvstore driver over grpc, so the test
// assertions should closely mimic those in the memory kvstore driver.
class KvStoreTest : public testing::Test {
public:
const std::string& address() const { return GetSingleton().address(); }
};

TEST_F(KvStoreTest, Basic) {
auto store = OpenStore();
auto context = tensorstore::Context::Default();
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, tensorstore::kvstore::Open({{"driver", "tsgrpc_kvstore"},
{"address", address()},
{"path", "basic/"}},
context)
.result());

tensorstore::internal::TestKeyValueReadWriteOps(store);
}

TEST_F(KvStoreTest, DeleteRange) {
auto store = OpenStore();
auto context = tensorstore::Context::Default();
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, tensorstore::kvstore::Open({{"driver", "tsgrpc_kvstore"},
{"address", address()},
{"path", "delete_range/"}},
context)
.result());

TENSORSTORE_EXPECT_OK(kvstore::Write(store, "a/b", absl::Cord("xyz")));
TENSORSTORE_EXPECT_OK(kvstore::Write(store, "a/d", absl::Cord("xyz")));
Expand All @@ -105,13 +123,19 @@ TEST_F(KvStoreTest, DeleteRange) {
}

TEST_F(KvStoreTest, List) {
auto store = OpenStore();
auto context = tensorstore::Context::Default();
TENSORSTORE_ASSERT_OK_AND_ASSIGN(
auto store, tensorstore::kvstore::Open({{"driver", "tsgrpc_kvstore"},
{"address", address()},
{"path", "list/"}},
context)
.result());

{
std::vector<std::string> log;
absl::Notification notification;
tensorstore::execution::submit(
store.driver->List({}),
kvstore::List(store, {}),
tensorstore::CompletionNotifyingReceiver{
&notification, tensorstore::LoggingReceiver{&log}});
notification.WaitForNotification();
Expand All @@ -131,7 +155,7 @@ TEST_F(KvStoreTest, List) {
std::vector<std::string> log;
absl::Notification notification;
tensorstore::execution::submit(
store.driver->List({}),
kvstore::List(store, {}),
tensorstore::CompletionNotifyingReceiver{
&notification, tensorstore::LoggingReceiver{&log}});

Expand All @@ -148,7 +172,7 @@ TEST_F(KvStoreTest, List) {
std::vector<std::string> log;
absl::Notification notification;
tensorstore::execution::submit(
store.driver->List({KeyRange::Prefix("a/c/")}),
kvstore::List(store, {KeyRange::Prefix("a/c/")}),
tensorstore::CompletionNotifyingReceiver{
&notification, tensorstore::LoggingReceiver{&log}});

Expand All @@ -164,7 +188,7 @@ TEST_F(KvStoreTest, List) {
std::vector<std::string> log;
absl::Notification notification;
tensorstore::execution::submit(
store.driver->List({}),
kvstore::List(store, {}),
tensorstore::CompletionNotifyingReceiver{
&notification, tensorstore::CancelOnStartingReceiver{{&log}}});

Expand All @@ -178,7 +202,7 @@ TEST_F(KvStoreTest, List) {
std::vector<std::string> log;
absl::Notification notification;
tensorstore::execution::submit(
store.driver->List({}),
kvstore::List(store, {}),
tensorstore::CompletionNotifyingReceiver{
&notification, tensorstore::CancelAfterNReceiver<2>{{&log}}});

Expand Down
Loading

0 comments on commit 652f20f

Please sign in to comment.