Skip to content

Commit

Permalink
Merge branch 'main' into MINIFICPP-2346-P1
Browse files Browse the repository at this point in the history
Merging latest upstream main into MINIFICPP-2346-P1 branch
  • Loading branch information
james94 committed Jun 17, 2024
2 parents 1795ec8 + 29bdc8e commit 42b23c6
Show file tree
Hide file tree
Showing 18 changed files with 80 additions and 79 deletions.
2 changes: 1 addition & 1 deletion C2.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Flow id and URL are usually retrieved from the C2 server. These identify the las

# in minifi.properties
nifi.c2.flow.id=8da5de7f-dcdb-4f6b-aa2f-6f162a7f9dc4
nifi.c2.flow.url=http://localhost:10090/efm/api/flows/8da5de7f-dcdb-4f6b-aa2f-6f162a7f9dc4/content?aid=efmtest
nifi.c2.flow.url=http://localhost:10090/c2-server/api/flows/8da5de7f-dcdb-4f6b-aa2f-6f162a7f9dc4/content?aid=efmtest

#### Agent Identifier Fallback

Expand Down
7 changes: 7 additions & 0 deletions CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,13 @@ Rocksdb has an option to run compaction at specific intervals not just when need
nifi.flowfile.repository.rocksdb.compaction.period=2 min
nifi.database.content.repository.rocksdb.compaction.period=2 min

### Configuring synchronous or asynchronous writes for RocksDB content repository

RocksDB has an option to set synchronous writes for its database, ensuring that the write operation does not return until the data being written has been pushed all the way to persistent storage. In MiNiFi C++, this is set to true by default to avoid any data loss, as according to the RocksDB documentation using non-sync writes can result in data loss in the event of a host machine crash. You can read more information about this option on the [RocksDB wiki page](https://github.com/facebook/rocksdb/wiki/Basic-Operations#synchronous-writes). If you prefer to use non-sync writes in your content repository for better write performance and can accept the possibility of the mentioned data loss, you can set this option to false.

# in minifi.properties
nifi.content.repository.rocksdb.use.synchronous.writes=true

### Global RocksDB options

There are a few options for RocksDB that are set for all used RocksDB databases in MiNiFi:
Expand Down
3 changes: 3 additions & 0 deletions conf/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ nifi.provenance.repository.class.name=NoOpRepository
nifi.content.repository.class.name=DatabaseContentRepository
# nifi.content.repository.rocksdb.compression=auto

# Use synchronous writes for the RocksDB content repository. Disable for better write performance, if data loss is acceptable in case of the host crashing.
# nifi.content.repository.rocksdb.use.synchronous.writes=true

## Relates to the internal workings of the rocksdb backend
# nifi.flowfile.repository.rocksdb.compaction.period=2 min
# nifi.database.content.repository.rocksdb.compaction.period=2 min
Expand Down
6 changes: 3 additions & 3 deletions encrypt-config/tests/resources/minifi.properties
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ nifi.c2.agent.protocol.class=RESTSender
#nifi.c2.agent.coap.port=
## base URL of the c2 server,
## very likely the same base url of rest urls
nifi.c2.flow.base.url=http://localhost:10080/efm/api
nifi.c2.rest.url=http://localhost:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://localhost:10080/efm/api/c2-protocol/acknowledge
nifi.c2.flow.base.url=http://localhost:10080/c2-server/api
nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
#nifi.c2.full.heartbeat=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ nifi.c2.agent.protocol.class=RESTSender
#nifi.c2.agent.coap.port=
## base URL of the c2 server,
## very likely the same base url of rest urls
nifi.c2.flow.base.url=http://localhost:10080/efm/api
nifi.c2.rest.url=http://localhost:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://localhost:10080/efm/api/c2-protocol/acknowledge
nifi.c2.flow.base.url=http://localhost:10080/c2-server/api
nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
#nifi.c2.full.heartbeat=false
Expand Down
9 changes: 3 additions & 6 deletions extensions/aws/s3/MultipartUploadStateStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void MultipartUploadStateStorage::storeState(const std::string& bucket, const st
state_manager_->get(stored_state);
std::string state_key = bucket + "/" + key;
stored_state[state_key + ".upload_id"] = state.upload_id;
stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time_ms_since_epoch);
stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
Expand Down Expand Up @@ -56,10 +56,7 @@ std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const
MultipartUploadState state;
state.upload_id = state_map[state_key + ".upload_id"];

int64_t stored_upload_time = 0;
core::Property::StringToInt(state_map[state_key + ".upload_time"], stored_upload_time);
state.upload_time = Aws::Utils::DateTime(stored_upload_time);

core::Property::StringToInt(state_map[state_key + ".upload_time"], state.upload_time_ms_since_epoch);
core::Property::StringToInt(state_map[state_key + ".uploaded_parts"], state.uploaded_parts);
core::Property::StringToInt(state_map[state_key + ".uploaded_size"], state.uploaded_size);
core::Property::StringToInt(state_map[state_key + ".part_size"], state.part_size);
Expand Down Expand Up @@ -108,7 +105,7 @@ void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds mul

std::vector<std::string> keys_to_remove;
for (const auto& [property_key, value] : state_map) {
std::string upload_time_suffix = ".upload_time";
static constexpr std::string_view upload_time_suffix = ".upload_time";
if (!minifi::utils::string::endsWith(property_key, upload_time_suffix)) {
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions extensions/aws/s3/MultipartUploadStateStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ struct MultipartUploadState {
: upload_id(upload_id),
part_size(part_size),
full_size(full_size),
upload_time(upload_time) {}
upload_time_ms_since_epoch(upload_time.Millis()) {}
std::string upload_id;
size_t uploaded_parts{};
uint64_t uploaded_size{};
uint64_t part_size{};
uint64_t full_size{};
Aws::Utils::DateTime upload_time;
int64_t upload_time_ms_since_epoch;
std::vector<std::string> uploaded_etags;

bool operator==(const MultipartUploadState&) const = default;
Expand Down
63 changes: 12 additions & 51 deletions extensions/aws/tests/MultipartUploadStateStorageTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,53 +44,33 @@ class MultipartUploadStateStorageTestFixture {

TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Store and get current key state", "[s3StateStorage]") {
REQUIRE(upload_storage_->getState("test_bucket", "key") == std::nullopt);
minifi::aws::s3::MultipartUploadState state;
state.upload_id = "id1";
minifi::aws::s3::MultipartUploadState state("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state.uploaded_parts = 2;
state.uploaded_size = 100_MiB;
state.part_size = 50_MiB;
state.full_size = 200_MiB;
state.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state.uploaded_etags = {"etag1", "etag2"};
upload_storage_->storeState("test_bucket", "key", state);
REQUIRE(*upload_storage_->getState("test_bucket", "key") == state);
}

TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Get key upload state from multiple keys and buckets", "[s3StateStorage]") {
minifi::aws::s3::MultipartUploadState state1;
state1.upload_id = "id1";
minifi::aws::s3::MultipartUploadState state1("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state1.uploaded_parts = 3;
state1.uploaded_size = 150_MiB;
state1.part_size = 50_MiB;
state1.full_size = 200_MiB;
state1.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state1.uploaded_etags = {"etag1", "etag2", "etag3"};
upload_storage_->storeState("old_bucket", "key1", state1);
minifi::aws::s3::MultipartUploadState state2;
state2.upload_id = "id2";
minifi::aws::s3::MultipartUploadState state2("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 2;
state2.uploaded_size = 100_MiB;
state2.part_size = 50_MiB;
state2.full_size = 200_MiB;
state2.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state2.uploaded_etags = {"etag4", "etag5"};
upload_storage_->storeState("test_bucket", "key1", state2);
minifi::aws::s3::MultipartUploadState state3;
state3.upload_id = "id3";
minifi::aws::s3::MultipartUploadState state3("id3", 50_MiB, 400_MiB, Aws::Utils::DateTime::Now());
state3.uploaded_parts = 4;
state3.uploaded_size = 200_MiB;
state3.part_size = 50_MiB;
state3.full_size = 400_MiB;
state3.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state3.uploaded_etags = {"etag6", "etag7", "etag9", "etag8"};
upload_storage_->storeState("test_bucket", "key2", state3);
minifi::aws::s3::MultipartUploadState state4;
state2.upload_id = "id2";
minifi::aws::s3::MultipartUploadState state4("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 3;
state2.uploaded_size = 150_MiB;
state2.part_size = 50_MiB;
state2.full_size = 200_MiB;
state2.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state2.uploaded_etags = {"etag4", "etag5", "etag10"};
upload_storage_->storeState("test_bucket", "key1", state4);
REQUIRE(*upload_storage_->getState("test_bucket", "key1") == state4);
Expand All @@ -99,31 +79,19 @@ TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Get key upload state f
}

TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Remove state", "[s3StateStorage]") {
minifi::aws::s3::MultipartUploadState state1;
state1.upload_id = "id1";
minifi::aws::s3::MultipartUploadState state1("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state1.uploaded_parts = 3;
state1.uploaded_size = 150_MiB;
state1.part_size = 50_MiB;
state1.full_size = 200_MiB;
state1.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state1.uploaded_etags = {"etag1", "etag2", "etag3"};
upload_storage_->storeState("old_bucket", "key1", state1);
minifi::aws::s3::MultipartUploadState state2;
state2.upload_id = "id2";
minifi::aws::s3::MultipartUploadState state2("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 2;
state2.uploaded_size = 100_MiB;
state2.part_size = 50_MiB;
state2.full_size = 200_MiB;
state2.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state2.uploaded_etags = {"etag4", "etag5"};
upload_storage_->storeState("test_bucket", "key1", state2);
minifi::aws::s3::MultipartUploadState state3;
state3.upload_id = "id3";
minifi::aws::s3::MultipartUploadState state3("id3", 50_MiB, 400_MiB, Aws::Utils::DateTime::Now());
state3.uploaded_parts = 4;
state3.uploaded_size = 200_MiB;
state3.part_size = 50_MiB;
state3.full_size = 400_MiB;
state3.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state3.uploaded_etags = {"etag6", "etag7", "etag9", "etag8"};
upload_storage_->storeState("test_bucket", "key2", state3);
REQUIRE(*upload_storage_->getState("old_bucket", "key1") == state1);
Expand All @@ -136,25 +104,18 @@ TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Remove state", "[s3Sta
}

TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Remove aged off state", "[s3StateStorage]") {
minifi::aws::s3::MultipartUploadState state1;
state1.upload_id = "id1";
using namespace std::literals::chrono_literals;
minifi::aws::s3::MultipartUploadState state1("id1", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now() - 10min);
state1.uploaded_parts = 3;
state1.uploaded_size = 150_MiB;
state1.part_size = 50_MiB;
state1.full_size = 200_MiB;
state1.upload_time = Aws::Utils::DateTime(Aws::Utils::DateTime::CurrentTimeMillis()) - std::chrono::minutes(10);
state1.uploaded_etags = {"etag1", "etag2", "etag3"};
upload_storage_->storeState("test_bucket", "key1", state1);
minifi::aws::s3::MultipartUploadState state2;
state2.upload_id = "id2";
minifi::aws::s3::MultipartUploadState state2("id2", 50_MiB, 200_MiB, Aws::Utils::DateTime::Now());
state2.uploaded_parts = 2;
state2.uploaded_size = 100_MiB;
state2.part_size = 50_MiB;
state2.full_size = 200_MiB;
state2.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
state2.uploaded_etags = {"etag4", "etag5"};
upload_storage_->storeState("test_bucket", "key2", state2);
upload_storage_->removeAgedStates(std::chrono::milliseconds(10));
upload_storage_->removeAgedStates(10min);
REQUIRE(upload_storage_->getState("test_bucket", "key1") == std::nullopt);
REQUIRE(upload_storage_->getState("test_bucket", "key2") == state2);
}
Expand Down
5 changes: 3 additions & 2 deletions extensions/lua/LuaScriptEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class LuaScriptEngine {
void call(const std::string& fn_name, Args&& ...args) {
sol::protected_function_result function_result{};
try {
sol::protected_function fn = lua_[fn_name.c_str()];
function_result = fn(convert(args)...);
if (sol::protected_function fn = lua_[fn_name.c_str()]) {
function_result = fn(convert(args)...);
}
} catch (const std::exception& e) {
throw LuaScriptException(e.what());
}
Expand Down
10 changes: 10 additions & 0 deletions extensions/lua/tests/TestExecuteScriptProcessorWithLuaScript.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@

namespace org::apache::nifi::minifi::processors::test {

TEST_CASE("Lua: hello world") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};

execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(print("Hello world!"))");

CHECK_NOTHROW(controller.trigger());
}

TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@

namespace org::apache::nifi::minifi::processors::test {

TEST_CASE("Python: hello world") {
const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");
minifi::test::SingleProcessorTestController controller{execute_script};

execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
execute_script->setProperty(ExecuteScript::ScriptBody, R"(print("Hello world!"))");

CHECK_NOTHROW(controller.trigger());
}

TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
TestController test_controller;
auto plan = test_controller.createPlan();
Expand Down
10 changes: 7 additions & 3 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
logger_->log_error("NiFi Content DB Repository database open {} fail", directory_);
is_valid_ = false;
}

use_synchronous_writes_ = configuration->get(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false";
return is_valid_;
}

Expand Down Expand Up @@ -137,10 +139,12 @@ void DatabaseContentRepository::stop() {
}
}

DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {}
DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository, bool use_synchronous_writes)
: BufferedContentSession(std::move(repository)),
use_synchronous_writes_(use_synchronous_writes) {}

std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
return std::make_shared<Session>(sharedFromThis());
return std::make_shared<Session>(sharedFromThis(), use_synchronous_writes_);
}

void DatabaseContentRepository::Session::commit() {
Expand Down Expand Up @@ -172,7 +176,7 @@ void DatabaseContentRepository::Session::commit() {
}

rocksdb::WriteOptions options;
options.sync = true;
options.sync = use_synchronous_writes_;
rocksdb::Status status = opendb->Write(options, &batch);
if (!status.ok()) {
throw Exception(REPOSITORY_EXCEPTION, "Batch write failed: " + status.ToString());
Expand Down
5 changes: 4 additions & 1 deletion extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ namespace org::apache::nifi::minifi::core::repository {
class DatabaseContentRepository : public core::ContentRepository {
class Session : public BufferedContentSession {
public:
explicit Session(std::shared_ptr<ContentRepository> repository);
explicit Session(std::shared_ptr<ContentRepository> repository, bool use_synchronous_writes);

void commit() override;
private:
bool use_synchronous_writes_;
};

static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2};
Expand Down Expand Up @@ -103,6 +105,7 @@ class DatabaseContentRepository : public core::ContentRepository {
std::mutex keys_mtx_;
std::vector<std::string> keys_to_delete_;
std::unique_ptr<utils::StoppableThread> gc_thread_;
bool use_synchronous_writes_ = true;
};

} // namespace org::apache::nifi::minifi::core::repository
7 changes: 4 additions & 3 deletions extensions/rocksdb-repos/RocksDbStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace org::apache::nifi::minifi::io {

RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, minifi::internal::WriteBatch* batch)
RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, minifi::internal::WriteBatch* batch, bool use_synchronous_writes)
: BaseStream(),
path_(std::move(path)),
write_enable_(write_enable),
Expand All @@ -39,7 +39,8 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R
}()),
offset_(0),
batch_(batch),
size_(value_.size()) {
size_(value_.size()),
use_synchronous_writes_(use_synchronous_writes) {
}

void RocksDbStream::close() {
Expand Down Expand Up @@ -67,7 +68,7 @@ size_t RocksDbStream::write(const uint8_t *value, size_t size) {
status = batch_->Merge(path_, slice_value);
} else {
rocksdb::WriteOptions opts;
opts.sync = true;
opts.sync = use_synchronous_writes_;
status = opendb->Merge(opts, path_, slice_value);
}
if (status.ok()) {
Expand Down
4 changes: 3 additions & 1 deletion extensions/rocksdb-repos/RocksDbStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class RocksDbStream : public io::BaseStream {
* File Stream constructor that accepts an fstream shared pointer.
* It must already be initialized for read and write.
*/
explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false, minifi::internal::WriteBatch* batch = nullptr);
explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false,
minifi::internal::WriteBatch* batch = nullptr, bool use_synchronous_writes = true);

~RocksDbStream() override {
close();
Expand Down Expand Up @@ -89,6 +90,7 @@ class RocksDbStream : public io::BaseStream {
size_t offset_;
minifi::internal::WriteBatch* batch_;
size_t size_;
bool use_synchronous_writes_;

private:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbStream>::getLogger();
Expand Down
1 change: 1 addition & 0 deletions libminifi/include/properties/Configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Configuration : public Properties {
static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period";
static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period";
static constexpr const char *nifi_dbcontent_repository_purge_period = "nifi.database.content.repository.purge.period";
static constexpr const char *nifi_content_repository_rocksdb_use_synchronous_writes = "nifi.content.repository.rocksdb.use.synchronous.writes";

static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
Expand Down
Loading

0 comments on commit 42b23c6

Please sign in to comment.