Skip to content

Commit

Permalink
Merge branch 'main' into admin-streaming-c++-api-lifecycle
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Marantz <[email protected]>
  • Loading branch information
jmarantz committed Mar 19, 2024
2 parents 1a7cd3e + 1b768b4 commit c7e5305
Show file tree
Hide file tree
Showing 24 changed files with 250 additions and 76 deletions.
4 changes: 3 additions & 1 deletion envoy/filesystem/watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "envoy/common/platform.h"
#include "envoy/common/pure.h"

#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"

namespace Envoy {
Expand Down Expand Up @@ -35,8 +36,9 @@ class Watcher {
* for the given directory.
* @param events supplies the events to watch.
* @param cb supplies the callback to invoke when a change occurs.
* @return a failure status if the file does not exist
*/
virtual void addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) PURE;
virtual absl::Status addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) PURE;
};

using WatcherPtr = std::unique_ptr<Watcher>;
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/watched_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ namespace Config {
WatchedDirectory::WatchedDirectory(const envoy::config::core::v3::WatchedDirectory& config,
Event::Dispatcher& dispatcher) {
watcher_ = dispatcher.createFilesystemWatcher();
watcher_->addWatch(absl::StrCat(config.path(), "/"), Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) { cb_(); });
THROW_IF_NOT_OK(watcher_->addWatch(absl::StrCat(config.path(), "/"),
Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) { cb_(); }));
}

} // namespace Config
Expand Down
7 changes: 4 additions & 3 deletions source/common/filesystem/inotify/watcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,24 @@ WatcherImpl::WatcherImpl(Event::Dispatcher& dispatcher, Filesystem::Instance& fi

WatcherImpl::~WatcherImpl() { close(inotify_fd_); }

void WatcherImpl::addWatch(absl::string_view path, uint32_t events, OnChangedCb callback) {
absl::Status WatcherImpl::addWatch(absl::string_view path, uint32_t events, OnChangedCb callback) {
// Because of general inotify pain, we always watch the directory that the file lives in,
// and then synthetically raise per file events.
auto result_or_error = file_system_.splitPathFromFilename(path);
THROW_IF_STATUS_NOT_OK(result_or_error, throw);
RETURN_IF_STATUS_NOT_OK(result_or_error);
const PathSplitResult result = result_or_error.value();

const uint32_t watch_mask = IN_MODIFY | IN_MOVED_TO;
int watch_fd = inotify_add_watch(inotify_fd_, std::string(result.directory_).c_str(), watch_mask);
if (watch_fd == -1) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("unable to add filesystem watch for file {}: {}", path, errorDetails(errno)));
}

ENVOY_LOG(debug, "added watch for directory: '{}' file: '{}' fd: {}", result.directory_,
result.file_, watch_fd);
callback_map_[watch_fd].watches_.push_back({std::string(result.file_), events, callback});
return absl::OkStatus();
}

void WatcherImpl::onInotifyEvent() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/filesystem/inotify/watcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class WatcherImpl : public Watcher, Logger::Loggable<Logger::Id::file> {
~WatcherImpl() override;

// Filesystem::Watcher
void addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) override;
absl::Status addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) override;

private:
struct FileWatch {
Expand Down
6 changes: 4 additions & 2 deletions source/common/filesystem/kqueue/watcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ WatcherImpl::~WatcherImpl() {
watches_.clear();
}

void WatcherImpl::addWatch(absl::string_view path, uint32_t events, Watcher::OnChangedCb cb) {
absl::Status WatcherImpl::addWatch(absl::string_view path, uint32_t events,
Watcher::OnChangedCb cb) {
FileWatchPtr watch = addWatch(path, events, cb, false);
if (watch == nullptr) {
throwEnvoyExceptionOrPanic(absl::StrCat("invalid watch path ", path));
return absl::InvalidArgumentError(absl::StrCat("invalid watch path ", path));
}
return absl::OkStatus();
}

WatcherImpl::FileWatchPtr WatcherImpl::addWatch(absl::string_view path, uint32_t events,
Expand Down
2 changes: 1 addition & 1 deletion source/common/filesystem/kqueue/watcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class WatcherImpl : public Watcher, Logger::Loggable<Logger::Id::file> {
~WatcherImpl();

// Filesystem::Watcher
void addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) override;
absl::Status addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) override;

private:
struct FileWatch : LinkedObject<FileWatch> {
Expand Down
9 changes: 5 additions & 4 deletions source/common/filesystem/win32/watcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ WatcherImpl::~WatcherImpl() {
::CloseHandle(thread_exit_event_);
}

void WatcherImpl::addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) {
absl::Status WatcherImpl::addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) {
if (path == Platform::null_device_path) {
return;
return absl::OkStatus();
}

const absl::StatusOr<PathSplitResult> result_or_error = file_system_.splitPathFromFilename(path);
THROW_IF_STATUS_NOT_OK(result_or_error, throw);
RETURN_IF_STATUS_NOT_OK(result_or_error);
const PathSplitResult& result = result_or_error.value();
// ReadDirectoryChangesW only has a Unicode version, so we need
// to use wide strings here
Expand All @@ -67,7 +67,7 @@ void WatcherImpl::addWatch(absl::string_view path, uint32_t events, OnChangedCb
directory.c_str(), GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
nullptr, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL);
if (dir_handle == INVALID_HANDLE_VALUE) {
throw EnvoyException(
return absl::InvalidArgumentError(
fmt::format("unable to open directory {}: {}", result.directory_, GetLastError()));
}
std::string fii_key(sizeof(FILE_ID_INFO), '\0');
Expand Down Expand Up @@ -108,6 +108,7 @@ void WatcherImpl::addWatch(absl::string_view path, uint32_t events, OnChangedCb

callback_map_[fii_key]->watches_.push_back({file, events, cb});
ENVOY_LOG(debug, "added watch for file '{}' in directory '{}'", result.file_, result.directory_);
return absl::OkStatus();
}

void WatcherImpl::onDirectoryEvent() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/filesystem/win32/watcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class WatcherImpl : public Watcher, Logger::Loggable<Logger::Id::file> {
~WatcherImpl();

// Filesystem::Watcher
void addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) override;
absl::Status addWatch(absl::string_view path, uint32_t events, OnChangedCb cb) override;

private:
static void issueFirstRead(ULONG_PTR param);
Expand Down
31 changes: 23 additions & 8 deletions source/common/formatter/substitution_format_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ namespace Formatter {
*/
class SubstitutionFormatStringUtils {
public:
using FormattersConfig =
ProtobufWkt::RepeatedPtrField<envoy::config::core::v3::TypedExtensionConfig>;

/**
* Generate a formatter object from config SubstitutionFormatString.
* Parse list of formatter configurations to commands.
*/
template <class FormatterContext = HttpFormatterContext>
static FormatterBasePtr<FormatterContext>
fromProtoConfig(const envoy::config::core::v3::SubstitutionFormatString& config,
static std::vector<CommandParserBasePtr<FormatterContext>>
parseFormatters(const FormattersConfig& formatters,
Server::Configuration::GenericFactoryContext& context) {
// Instantiate formatter extensions.
std::vector<CommandParserBasePtr<FormatterContext>> commands;
for (const auto& formatter : config.formatters()) {
for (const auto& formatter : formatters) {
auto* factory =
Envoy::Config::Utility::getFactory<CommandParserFactoryBase<FormatterContext>>(formatter);
if (!factory) {
Expand All @@ -47,12 +49,24 @@ class SubstitutionFormatStringUtils {
commands.push_back(std::move(parser));
}

return commands;
}

/**
* Generate a formatter object from config SubstitutionFormatString.
*/
template <class FormatterContext = HttpFormatterContext>
static FormatterBasePtr<FormatterContext>
fromProtoConfig(const envoy::config::core::v3::SubstitutionFormatString& config,
Server::Configuration::GenericFactoryContext& context) {
// Instantiate formatter extensions.
auto commands = parseFormatters<FormatterContext>(config.formatters(), context);
switch (config.format_case()) {
case envoy::config::core::v3::SubstitutionFormatString::FormatCase::kTextFormat:
return std::make_unique<FormatterBaseImpl<FormatterContext>>(
config.text_format(), config.omit_empty_values(), commands);
case envoy::config::core::v3::SubstitutionFormatString::FormatCase::kJsonFormat:
return std::make_unique<JsonFormatterBaseImpl<FormatterContext>>(
return createJsonFormatter<FormatterContext>(
config.json_format(), true, config.omit_empty_values(),
config.has_json_format_options() ? config.json_format_options().sort_properties() : false,
commands);
Expand All @@ -75,9 +89,10 @@ class SubstitutionFormatStringUtils {
template <class FormatterContext = HttpFormatterContext>
static FormatterBasePtr<FormatterContext>
createJsonFormatter(const ProtobufWkt::Struct& struct_format, bool preserve_types,
bool omit_empty_values, bool sort_properties) {
bool omit_empty_values, bool sort_properties,
const std::vector<CommandParserBasePtr<FormatterContext>>& commands = {}) {
return std::make_unique<JsonFormatterBaseImpl<FormatterContext>>(
struct_format, preserve_types, omit_empty_values, sort_properties);
struct_format, preserve_types, omit_empty_values, sort_properties, commands);
}
};

Expand Down
8 changes: 6 additions & 2 deletions source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,12 @@ LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator
if (watcher_ == nullptr) {
watcher_ = dispatcher.createFilesystemWatcher();
}
watcher_->addWatch(layer.disk_layer().symlink_root(), Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) -> void { THROW_IF_NOT_OK(loadNewSnapshot()); });
creation_status = watcher_->addWatch(
layer.disk_layer().symlink_root(), Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) -> void { THROW_IF_NOT_OK(loadNewSnapshot()); });
if (!creation_status.ok()) {
return;
}
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kRtdsLayer:
subscriptions_.emplace_back(
Expand Down
6 changes: 3 additions & 3 deletions source/common/secret/sds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ absl::Status SdsApi::onConfigUpdate(const std::vector<Config::DecodedResourceRef
// on directory level (e.g. Kubernetes secret update).
const auto result_or_error = api_.fileSystem().splitPathFromFilename(filename);
RETURN_IF_STATUS_NOT_OK(result_or_error);
watcher_->addWatch(absl::StrCat(result_or_error.value().directory_, "/"),
Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) { onWatchUpdate(); });
RETURN_IF_NOT_OK(watcher_->addWatch(absl::StrCat(result_or_error.value().directory_, "/"),
Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) { onWatchUpdate(); }));
}
} else {
watcher_.reset(); // Destroy the old watch if any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ template <class T, size_t (*deleter)(T*), unsigned (*getDictId)(const T*)> class
envoy::config::core::v3::DataSource::SpecifierCase::kFilename) {
is_watch_added = true;
const auto& filename = source.filename();
watcher_->addWatch(
THROW_IF_NOT_OK(watcher_->addWatch(
filename, Filesystem::Watcher::Events::Modified | Filesystem::Watcher::Events::MovedTo,
[this, id, filename](uint32_t) { onDictionaryUpdate(id, filename); });
[this, id, filename](uint32_t) { onDictionaryUpdate(id, filename); }));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
stats_(stats), api_(api), validation_visitor_(validation_visitor) {
if (!path_config_source.has_watched_directory()) {
file_watcher_ = dispatcher.createFilesystemWatcher();
file_watcher_->addWatch(path_, Filesystem::Watcher::Events::MovedTo, [this](uint32_t) {
if (started_) {
refresh();
}
});
THROW_IF_NOT_OK(
file_watcher_->addWatch(path_, Filesystem::Watcher::Events::MovedTo, [this](uint32_t) {
if (started_) {
refresh();
}
}));
} else {
directory_watcher_ =
std::make_unique<WatchedDirectory>(path_config_source.watched_directory(), dispatcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ InjectedResourceMonitor::InjectedResourceMonitor(
Server::Configuration::ResourceMonitorFactoryContext& context)
: filename_(config.filename()),
watcher_(context.mainThreadDispatcher().createFilesystemWatcher()), api_(context.api()) {
watcher_->addWatch(filename_, Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) { onFileChanged(); });
THROW_IF_NOT_OK(watcher_->addWatch(filename_, Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) { onFileChanged(); }));
}

void InjectedResourceMonitor::onFileChanged() { file_changed_ = true; }
Expand Down
3 changes: 2 additions & 1 deletion test/common/config/watched_directory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "gtest/gtest.h"

using testing::DoAll;
using testing::Return;
using testing::SaveArg;

Expand All @@ -21,7 +22,7 @@ TEST(WatchedDirectory, All) {
EXPECT_CALL(dispatcher, createFilesystemWatcher_()).WillOnce(Return(watcher));
Filesystem::Watcher::OnChangedCb cb;
EXPECT_CALL(*watcher, addWatch("foo/bar/", Filesystem::Watcher::Events::MovedTo, _))
.WillOnce(SaveArg<2>(&cb));
.WillOnce(DoAll(SaveArg<2>(&cb), Return(absl::OkStatus())));
WatchedDirectory wd(config, dispatcher);
bool called = false;
wd.setCallback([&called] { called = true; });
Expand Down
Loading

0 comments on commit c7e5305

Please sign in to comment.