Skip to content

Commit

Permalink
more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
majetideepak committed Oct 27, 2024
1 parent 3efec9f commit 2c7c5d8
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,30 @@

#include "velox/connectors/hive/storage_adapters/gcs/GCSFileSystem.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/config/Config.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h"
#include "velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h"
#include "velox/connectors/hive/storage_adapters/gcs/tests/GCSTestbench.h"
#include "velox/exec/tests/utils/TempFilePath.h"

#include "gtest/gtest.h"

namespace facebook::velox::filesystems {
namespace {
class GCSFileSystemTest : public testing::Test {
protected:
static void SetUpTestSuite() {
if (testbench_ == nullptr) {
testbench_ = std::make_shared<GcsTestbench>();
testbench_->bootstrap();
}
public:
void SetUp() {
testbench_ = std::make_shared<GCSTestbench>();
testbench_->bootstrap();
}

std::shared_ptr<const config::ConfigBase> testGcsOptions() const {
std::unordered_map<std::string, std::string> configOverride = {};

configOverride["hive.gcs.scheme"] = "http";
configOverride["hive.gcs.endpoint"] = "localhost:" + testbench_->port();
return std::make_shared<const config::ConfigBase>(
std::move(configOverride));
}

static std::shared_ptr<GcsTestbench> testbench_;
std::shared_ptr<GCSTestbench> testbench_;
};

std::shared_ptr<GcsTestbench> GCSFileSystemTest::testbench_ = nullptr;

TEST_F(GCSFileSystemTest, readFile) {
const auto gcsFile = gcsURI(
testbench_->preexistingBucketName(), testbench_->preexistingObjectName());

filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
auto readFile = gcfs.openFileForRead(gcsFile);
std::int64_t size = readFile->size();
Expand Down Expand Up @@ -90,7 +76,7 @@ TEST_F(GCSFileSystemTest, writeAndReadFile) {
const std::string_view newFile = "readWriteFile.txt";
const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile);

filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
auto writeFile = gcfs.openFileForWrite(gcsFile);
std::string dataContent =
Expand All @@ -114,23 +100,19 @@ TEST_F(GCSFileSystemTest, writeAndReadFile) {
std::int64_t size = readFile->size();
EXPECT_EQ(readFile->size(), contentSize);
EXPECT_EQ(readFile->pread(0, size), dataContent);
}

TEST_F(GCSFileSystemTest, openExistingFileForWrite) {
const std::string_view newFile = "readWriteFile.txt";
const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), newFile);

filesystems::GCSFileSystem gcfs(testGcsOptions());
gcfs.initializeClient();
VELOX_ASSERT_THROW(gcfs.openFileForWrite(gcsFile), "File already exists");
// Opening an existing file for write must be an error.
filesystems::GCSFileSystem newGcfs(testbench_->hiveConfig());
newGcfs.initializeClient();
VELOX_ASSERT_THROW(newGcfs.openFileForWrite(gcsFile), "File already exists");
}

TEST_F(GCSFileSystemTest, renameNotImplemented) {
const std::string_view file = "newTest.txt";
const auto gcsExistingFile = gcsURI(
testbench_->preexistingBucketName(), testbench_->preexistingObjectName());
const auto gcsNewFile = gcsURI(testbench_->preexistingBucketName(), file);
filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
gcfs.openFileForRead(gcsExistingFile);
VELOX_ASSERT_THROW(
Expand All @@ -141,7 +123,7 @@ TEST_F(GCSFileSystemTest, renameNotImplemented) {
TEST_F(GCSFileSystemTest, mkdirNotImplemented) {
const std::string_view dir = "newDirectory";
const auto gcsNewDirectory = gcsURI(testbench_->preexistingBucketName(), dir);
filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
VELOX_ASSERT_THROW(
gcfs.mkdir(gcsNewDirectory), "mkdir for GCS not implemented");
Expand All @@ -150,15 +132,15 @@ TEST_F(GCSFileSystemTest, mkdirNotImplemented) {
TEST_F(GCSFileSystemTest, rmdirNotImplemented) {
const std::string_view dir = "Directory";
const auto gcsDirectory = gcsURI(testbench_->preexistingBucketName(), dir);
filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
VELOX_ASSERT_THROW(gcfs.rmdir(gcsDirectory), "rmdir for GCS not implemented");
}

TEST_F(GCSFileSystemTest, missingFile) {
const std::string_view file = "newTest.txt";
const auto gcsFile = gcsURI(testbench_->preexistingBucketName(), file);
filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
VELOX_ASSERT_RUNTIME_THROW_CODE(
gcfs.openFileForRead(gcsFile),
Expand All @@ -167,7 +149,7 @@ TEST_F(GCSFileSystemTest, missingFile) {
}

TEST_F(GCSFileSystemTest, missingBucket) {
filesystems::GCSFileSystem gcfs(testGcsOptions());
filesystems::GCSFileSystem gcfs(testbench_->hiveConfig());
gcfs.initializeClient();
const std::string_view gcsFile = "gs://dummy/foo.txt";
VELOX_ASSERT_RUNTIME_THROW_CODE(
Expand All @@ -177,14 +159,12 @@ TEST_F(GCSFileSystemTest, missingBucket) {
}

TEST_F(GCSFileSystemTest, credentialsConfig) {
std::unordered_map<std::string, std::string> configOverride = {};

// credentials from arrow gcsfs test case
// While this service account key has the correct format, it cannot be used
// for authentication because the key has been deactivated on the server-side,
// *and* the account(s) involved are deleted *and* they are not the accounts
// or projects do not match its contents.
auto creds = R"""({
const std::string_view kCreds = R"""({
"type": "service_account",
"project_id": "foo-project",
"private_key_id": "a1a111aa1111a11a11a11aa111a111a1a1111111",
Expand Down Expand Up @@ -220,28 +200,19 @@ TEST_F(GCSFileSystemTest, credentialsConfig) {
})""";
auto jsonFile = exec::test::TempFilePath::create();
std::ofstream credsOut(jsonFile->getPath());
credsOut << creds;
credsOut << kCreds;
credsOut.close();
configOverride["hive.gcs.json-key-file-path"] = jsonFile->getPath();
configOverride["hive.gcs.scheme"] = "http";
configOverride["hive.gcs.endpoint"] = "localhost:" + testbench_->port();
std::shared_ptr<const config::ConfigBase> conf =
std::make_shared<const config::ConfigBase>(std::move(configOverride));

filesystems::GCSFileSystem gcfs(conf);
std::unordered_map<std::string, std::string> configOverride = {
{"hive.gcs.json-key-file-path", jsonFile->getPath()}};
auto hiveConfig = testbench_->hiveConfig(configOverride);

filesystems::GCSFileSystem gcfs(hiveConfig);
gcfs.initializeClient();
try {
const std::string gcsFile = gcsURI(
testbench_->preexistingBucketName(),
testbench_->preexistingObjectName());
gcfs.openFileForRead(gcsFile);
FAIL() << "Expected VeloxException";
} catch (VeloxException const& err) {
EXPECT_THAT(
err.message(), testing::HasSubstr("gs://test1-gcs/test-object-name"));
EXPECT_THAT(
err.message(), testing::HasSubstr("Invalid ServiceAccountCredentials"));
}
const auto gcsFile = gcsURI(
testbench_->preexistingBucketName(), testbench_->preexistingObjectName());
VELOX_ASSERT_THROW(
gcfs.openFileForRead(gcsFile), "Invalid ServiceAccountCredentials");
}
} // namespace
} // namespace facebook::velox::filesystems
27 changes: 8 additions & 19 deletions velox/connectors/hive/storage_adapters/gcs/tests/GCSInsertTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <gtest/gtest.h>

#include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/tests/GcsTestbench.h"
#include "velox/connectors/hive/storage_adapters/gcs/tests/GCSTestbench.h"
#include "velox/connectors/hive/storage_adapters/test_common/InsertTest.h"

using namespace facebook::velox::exec::test;
Expand All @@ -31,20 +31,20 @@ class GCSInsertTest : public testing::Test, public test::InsertTest {
static void SetUpTestSuite() {
registerGCSFileSystem();
memory::MemoryManager::testingSetInstance({});
if (testbench_ == nullptr) {
testbench_ = std::make_shared<GcsTestbench>();
testbench_->bootstrap();
}
}

void SetUp() override {
connector::registerConnectorFactory(
std::make_shared<connector::hive::HiveConnectorFactory>());
testbench_ = std::make_shared<GCSTestbench>();
testbench_->bootstrap();
auto hiveConnector =
connector::getConnectorFactory(
connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(
exec::test::kHiveConnectorId, gcsOptions(), ioExecutor_.get());
exec::test::kHiveConnectorId,
testbench_->hiveConfig(),
ioExecutor_.get());
connector::registerConnector(hiveConnector);
parquet::registerParquetReaderFactory();
parquet::registerParquetWriterFactory();
Expand All @@ -59,25 +59,14 @@ class GCSInsertTest : public testing::Test, public test::InsertTest {
connector::unregisterConnector(exec::test::kHiveConnectorId);
}

std::shared_ptr<const config::ConfigBase> gcsOptions() const {
static std::unordered_map<std::string, std::string> configOverride = {};

configOverride["hive.gcs.scheme"] = "http";
configOverride["hive.gcs.endpoint"] = "localhost:" + testbench_->port();
return std::make_shared<const config::ConfigBase>(
std::move(configOverride));
}

static std::shared_ptr<GcsTestbench> testbench_;
std::shared_ptr<GCSTestbench> testbench_;
std::unique_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
};

std::shared_ptr<GcsTestbench> GCSInsertTest::testbench_ = nullptr;
} // namespace

TEST_F(GCSInsertTest, gcsInsertTest) {
const int64_t kExpectedRows = 1'000;
const auto gcsBucket = gcsURI(testbench_->preexistingBucketName());
const auto gcsBucket = gcsURI(testbench_->preexistingBucketName(), "");
runInsertTest(gcsBucket, kExpectedRows, pool());
}
} // namespace facebook::velox::filesystems
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <google/cloud/storage/client.h>
#include "gtest/gtest.h"

#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/gcs/GCSUtil.h"
#include "velox/exec/tests/utils/PortUtil.h"

Expand All @@ -30,11 +31,11 @@ namespace gcs = google::cloud::storage;

namespace facebook::velox::filesystems {

class GcsTestbench : public testing::Environment {
class GCSTestbench : public testing::Environment {
public:
GcsTestbench() {
auto port = facebook::velox::exec::test::getFreePorts(1);
port_ = std::to_string(port[0]);
GCSTestbench() {
auto port = std::to_string(facebook::velox::exec::test::getFreePorts(1)[0]);
endpoint_ = "http://localhost:" + port;
std::vector<std::string> names{"python3", "python"};
// If the build script or application developer provides a value in the
// PYTHON environment variable, then just use that.
Expand All @@ -58,7 +59,7 @@ class GcsTestbench : public testing::Environment {
"-m",
"testbench",
"--port",
port_,
port,
group_);
if (serverProcess_.valid() && serverProcess_.running())
break;
Expand All @@ -71,7 +72,7 @@ class GcsTestbench : public testing::Environment {
error_ = std::move(error);
}

~GcsTestbench() override {
~GCSTestbench() override {
// Brutal shutdown, kill the full process group because the GCS testbench
// may launch additional children.
group_.terminate();
Expand All @@ -80,12 +81,18 @@ class GcsTestbench : public testing::Environment {
}
}

const std::string& port() const {
return port_;
}
std::shared_ptr<const config::ConfigBase> hiveConfig(
const std::unordered_map<std::string, std::string> configOverride = {})
const {
std::unordered_map<std::string, std::string> config(
{{"hive.gcs.endpoint", endpoint_}});

// Update the default config map with the supplied configOverride map
for (const auto& [configName, configValue] : configOverride) {
config[configName] = configValue;
}

const std::string& error() const {
return error_;
return std::make_shared<const config::ConfigBase>(std::move(config));
}

std::string_view preexistingBucketName() {
Expand All @@ -98,13 +105,13 @@ class GcsTestbench : public testing::Environment {

void bootstrap() {
ASSERT_THAT(this, ::testing::NotNull());
ASSERT_THAT(this->error(), ::testing::IsEmpty());
ASSERT_THAT(this->error_, ::testing::IsEmpty());

// Create a bucket and a small file in the testbench. This makes it easier
// to bootstrap GcsFileSystem and its tests.
auto client = gcs::Client(
google::cloud::Options{}
.set<gcs::RestEndpointOption>("http://localhost:" + this->port())
.set<gcs::RestEndpointOption>(this->endpoint_)
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));

bucketName_ = "test1-gcs";
Expand All @@ -122,7 +129,7 @@ class GcsTestbench : public testing::Environment {
}

private:
std::string port_;
std::string endpoint_;
bp::child serverProcess_;
bp::group group_;
std::string error_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ using namespace facebook::velox::exec::test;
namespace facebook::velox {
namespace {

class S3MultipleEndpoints : public S3Test {
class S3MultipleEndpoints : public S3Test, public ::test::VectorTestBase {
public:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace facebook::velox::exec::test;
namespace facebook::velox {
namespace {

class S3ReadTest : public S3Test {
class S3ReadTest : public S3Test, public ::test::VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
Expand Down

0 comments on commit 2c7c5d8

Please sign in to comment.