From a931af780cd90c843af78d7eedfcae6b85bed7be Mon Sep 17 00:00:00 2001 From: Matthew Johnson Date: Thu, 20 Jun 2024 12:08:04 -0400 Subject: [PATCH] enable external publishing --- .gitignore | 4 ++ CMakeLists.txt | 1 + README.md | 5 ++ admin/admin_server.cc | 19 +++--- admin/admin_server_test.cc | 16 +++--- bin/spectatord_main.cc | 54 +++++++++-------- build.sh | 9 ++- conanfile.py | 19 +++++- metatron/CMakeLists.txt | 26 +++++++++ metatron/metatron_config.h | 22 +++++++ metatron/metatron_sample.cc | 22 +++++++ spectator/CMakeLists.txt | 2 +- spectator/config.h | 5 +- spectator/http_client.cc | 105 +++++++++++++++++++++------------- spectator/http_client.h | 12 ++-- spectator/http_client_test.cc | 3 +- spectator/publisher.h | 22 +++---- 17 files changed, 248 insertions(+), 98 deletions(-) create mode 100644 metatron/CMakeLists.txt create mode 100644 metatron/metatron_config.h create mode 100644 metatron/metatron_sample.cc diff --git a/.gitignore b/.gitignore index 8d03786..c896401 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,10 @@ .idea/ cmake-build-debug cmake-build/ +metatron/auth_context.pb.cc +metatron/auth_context.pb.h +metatron/auth_context.proto +metatron/metatron_config.cc ska/ spectator/netflix_config.cc venv/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 356ec7a..f58554b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,7 @@ conan_basic_setup() add_subdirectory(admin) add_subdirectory(bench) +add_subdirectory(metatron) add_subdirectory(server) add_subdirectory(spectator) add_subdirectory(tools) diff --git a/README.md b/README.md index fc2ba17..84c86a0 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,14 @@ spectatord: A daemon that listens for metrics and reports them to Atlas. default: ""; --debug (Debug spectatord. All values will be sent to a dev aggregator and dropped.); default: false; + --enable_external (Enable external publishing.); default: false; --enable_socket (Enable UNIX domain socket support. Default is true on Linux and false on MacOS.); default: true; --enable_statsd (Enable statsd support.); default: false; + --metatron_dir (Path to the Metatron certificates, which are used for + external publishing. A number of well-known directories are searched by + default. This option is only necessary if your certificates are in an + unusual location.); default: ""; --meter_ttl (Meter TTL: expire meters after this period of inactivity.); default: 15m; --no_common_tags (No common tags will be provided for metrics. Since no diff --git a/admin/admin_server.cc b/admin/admin_server.cc index a5bcad6..70c1c05 100644 --- a/admin/admin_server.cc +++ b/admin/admin_server.cc @@ -49,19 +49,24 @@ void GET_config(HTTPServerRequest& req, HTTPServerResponse& res, Object::Ptr obj = new Object(true); Object::Ptr common_tags = new Object(true); - for (auto& pair : config.common_tags) + for (auto& pair : config.common_tags) { common_tags->set(pair.first, pair.second); - obj->set("common_tags", common_tags); + } - obj->set("read_timeout", ToDoubleMilliseconds(config.read_timeout)); - obj->set("connect_timeout", ToDoubleMilliseconds(config.connect_timeout)); + obj->set("age_gauge_limit", config.age_gauge_limit); obj->set("batch_size", config.batch_size); + obj->set("common_tags", common_tags); + obj->set("connect_timeout", ToDoubleMilliseconds(config.connect_timeout)); + obj->set("expiration_frequency", ToDoubleMilliseconds(config.expiration_frequency)); + obj->set("external_enabled", config.external_enabled); + obj->set("external_uri", config.external_uri); obj->set("frequency", ToDoubleMilliseconds(config.frequency)); - obj->set("expiration_frequency", - ToDoubleMilliseconds(config.expiration_frequency)); + obj->set("metatron_dir", config.metatron_dir); obj->set("meter_ttl", ToDoubleMilliseconds(config.meter_ttl)); - obj->set("age_gauge_limit", config.age_gauge_limit); + obj->set("read_timeout", ToDoubleMilliseconds(config.read_timeout)); + obj->set("status_metrics_enabled", config.status_metrics_enabled); obj->set("uri", config.uri); + obj->set("verbose_http", config.verbose_http); res.setStatus(HTTPResponse::HTTP_OK); res.setContentType("application/json"); diff --git a/admin/admin_server_test.cc b/admin/admin_server_test.cc index 9bcbb86..c675819 100644 --- a/admin/admin_server_test.cc +++ b/admin/admin_server_test.cc @@ -161,17 +161,19 @@ TEST_F(AdminServerTest, GET_config) { Var result {parser.parse(rr)}; Object::Ptr object = result.extract(); - int count = 0; std::vector expected_keys {"age_gauge_limit", "batch_size", "common_tags", - "connect_timeout", "expiration_frequency", "frequency", - "meter_ttl", "read_timeout", "uri"}; + "connect_timeout", "expiration_frequency", + "external_enabled", "external_uri", "frequency", + "metatron_dir", "meter_ttl", "read_timeout", + "status_metrics_enabled", "uri", "verbose_http"}; + std::vector found_keys; + for (auto &it : *object) { - if (contains(expected_keys, it.first)) { - count += 1; - } + found_keys.emplace_back(it.first); } + std::sort(found_keys.begin(), found_keys.end()); - EXPECT_EQ(count, expected_keys.size()); + EXPECT_EQ(found_keys, expected_keys); } TEST_F(AdminServerTest, GET_config_common_tags) { diff --git a/bin/spectatord_main.cc b/bin/spectatord_main.cc index dd4b40e..5f9d344 100644 --- a/bin/spectatord_main.cc +++ b/bin/spectatord_main.cc @@ -43,14 +43,19 @@ auto AbslParseFlag(absl::string_view text, PortNumber* p, std::string* error) return true; } -ABSL_FLAG(PortNumber, port, PortNumber(1234), - "Port number for the UDP socket."); -ABSL_FLAG(bool, enable_statsd, false, - "Enable statsd support."); -ABSL_FLAG(PortNumber, statsd_port, PortNumber(8125), - "Port number for the statsd socket."); ABSL_FLAG(PortNumber, admin_port, PortNumber(1234), "Port number for the admin server."); +ABSL_FLAG(size_t, age_gauge_limit, 1000, + "The maximum number of age gauges that may be reported by this process."); +ABSL_FLAG(std::string, common_tags, "", + "Common tags: nf.app=app,nf.cluster=cluster. Override the default common " + "tags. If empty, then spectatord will use the default set. " + "This flag should only be used by experts who understand the risks."); +ABSL_FLAG(bool, debug, false, + "Debug spectatord. All values will be sent to a dev aggregator and " + "dropped."); +ABSL_FLAG(bool, enable_external, false, + "Enable external publishing."); #ifdef __linux__ ABSL_FLAG(bool, enable_socket, true, "Enable UNIX domain socket support. Default is true on Linux and false " @@ -60,30 +65,31 @@ ABSL_FLAG(bool, enable_socket, false, "Enable UNIX domain socket support. Default is true on Linux and false " "on MacOS and Windows."); #endif -ABSL_FLAG(std::string, socket_path, "/run/spectatord/spectatord.unix", - "Path to the UNIX domain socket."); -ABSL_FLAG(std::string, uri, "", - "Optional override URI for the aggregator."); +ABSL_FLAG(bool, enable_statsd, false, + "Enable statsd support."); +ABSL_FLAG(std::string, metatron_dir, "", + "Path to the Metatron certificates, which are used for external publishing. A number " + "of well-known directories are searched by default. This option is only necessary " + "if your certificates are in an unusual location."); ABSL_FLAG(absl::Duration, meter_ttl, absl::Minutes(15), "Meter TTL: expire meters after this period of inactivity."); -ABSL_FLAG(size_t, age_gauge_limit, 1000, - "The maximum number of age gauges that may be reported by this process."); -ABSL_FLAG(std::string, common_tags, "", - "Common tags: nf.app=app,nf.cluster=cluster. Override the default common " - "tags. If empty, then spectatord will use the default set. " - "This flag should only be used by experts who understand the risks."); ABSL_FLAG(bool, no_common_tags, false, "No common tags will be provided for metrics. Since no common tags are available, no " "internal status metrics will be recorded. Only use this feature for special cases " "where it is absolutely necessary to override common tags such as nf.app, and only " "use it with a secondary spectatord process."); +ABSL_FLAG(PortNumber, port, PortNumber(1234), + "Port number for the UDP socket."); +ABSL_FLAG(std::string, socket_path, "/run/spectatord/spectatord.unix", + "Path to the UNIX domain socket."); +ABSL_FLAG(PortNumber, statsd_port, PortNumber(8125), + "Port number for the statsd socket."); +ABSL_FLAG(std::string, uri, "", + "Optional override URI for the aggregator."); ABSL_FLAG(bool, verbose, false, "Use verbose logging."); ABSL_FLAG(bool, verbose_http, false, "Output debug info for HTTP requests."); -ABSL_FLAG(bool, debug, false, - "Debug spectatord. All values will be sent to a dev aggregator and " - "dropped."); auto main(int argc, char** argv) -> int { auto logger = Logger(); @@ -93,18 +99,20 @@ auto main(int argc, char** argv) -> int { signals.end()); backward::SignalHandling sh{signals}; - absl::SetProgramUsageMessage( - "A daemon that listens for metrics and reports them to Atlas."); + absl::SetProgramUsageMessage("A daemon that listens for metrics and reports them to Atlas."); absl::ParseCommandLine(argc, argv); auto cfg = GetSpectatorConfig(); auto maybe_agg_uri = absl::GetFlag(FLAGS_uri); if (absl::GetFlag(FLAGS_debug)) { - cfg->uri = - "https://atlas-aggr-dev.us-east-1.ieptest.netflix.net/api/v4/update"; + cfg->uri = "https://atlas-aggr-dev.us-east-1.ieptest.netflix.net/api/v4/update"; } else if (!maybe_agg_uri.empty()) { cfg->uri = std::move(maybe_agg_uri); + } else if (absl::GetFlag(FLAGS_enable_external)) { + cfg->external_enabled = true; + cfg->metatron_dir = absl::GetFlag(FLAGS_metatron_dir); + cfg->uri = cfg->external_uri; } if (absl::GetFlag(FLAGS_verbose_http)) { diff --git a/build.sh b/build.sh index af38536..1cd60a3 100755 --- a/build.sh +++ b/build.sh @@ -12,7 +12,14 @@ NC="\033[0m" if [[ "$1" == "clean" ]]; then echo -e "${BLUE}==== clean ====${NC}" rm -rf $BUILD_DIR - rm -f nflx_spectator_cppconf-*.zip + # intermediate download files + rm -f netflix_spectator_cppconf-*.zip + rm -f spectatord_metatron-*.zip + # extracted and generated files + rm -f metatron/auth_context.pb.cc + rm -f metatron/auth_context.pb.h + rm -f metatron/auth_context.proto + rm -f metatron/metatron_config.cc rm -rf ska rm -f spectator/*.inc rm -f spectator/netflix_config.cc diff --git a/conanfile.py b/conanfile.py index 08738b2..7880583 100644 --- a/conanfile.py +++ b/conanfile.py @@ -17,6 +17,7 @@ class SpectatorDConan(ConanFile): "libcurl/8.4.0", "openssl/3.2.0", "poco/1.12.5p2", + "protobuf/3.21.12", "rapidjson/cci.20230929", "spdlog/1.12.0", "tsl-hopscotch-map/2.3.1", @@ -28,6 +29,7 @@ class SpectatorDConan(ConanFile): def configure(self): self.options["libcurl"].with_c_ares = True + self.options["libcurl"].with_ssl = "openssl" self.options["poco"].enable_data_mysql = False self.options["poco"].enable_data_postgresql = False self.options["poco"].enable_data_sqlite = False @@ -55,7 +57,7 @@ def get_netflix_spectator_cppconf(): return dir_name = "netflix_spectator_cppconf" commit = "d44c6513f52fba019181e8c59c4c306bd6451b8d" - zip_name = f"nflx_spectator_cfg-{commit}.zip" + zip_name = f"netflix_spectator_cppconf-{commit}.zip" download(f"https://stash.corp.netflix.com/rest/api/latest/projects/CLDMTA/repos/netflix-spectator-cppconf/archive?at={commit}&format=zip", zip_name) check_sha256(zip_name, "87cafb9306c2cd96477aea2d26ef311ff0b4342a3fa57fd29432411ce355cf6a") unzip(zip_name, destination=dir_name) @@ -63,6 +65,21 @@ def get_netflix_spectator_cppconf(): os.unlink(zip_name) shutil.rmtree(dir_name) + @staticmethod + def get_spectatord_metatron(): + if os.environ.get("NFLX_INTERNAL") != "ON": + return + dir_name = "spectatord_metatron" + commit = "07f0cbcf2d606561d636a1e22931aa8d23bcb7a3" + zip_name = f"spectatord_metatron-{commit}.zip" + download(f"https://stash.corp.netflix.com/rest/api/latest/projects/CLDMTA/repos/spectatord-metatron/archive?at={commit}&format=zip", zip_name) + check_sha256(zip_name, "a367d20d62d1ec57622fa325268e7be67b99e58b36ea22dd2e71eba2af853a6c") + unzip(zip_name, destination=dir_name) + shutil.move(f"{dir_name}/metatron/auth_context.proto", "metatron") + shutil.move(f"{dir_name}/metatron/metatron_config.cc", "metatron") + os.unlink(zip_name) + shutil.rmtree(dir_name) + def source(self): self.get_flat_hash_map() self.get_netflix_spectator_cppconf() diff --git a/metatron/CMakeLists.txt b/metatron/CMakeLists.txt new file mode 100644 index 0000000..84f828f --- /dev/null +++ b/metatron/CMakeLists.txt @@ -0,0 +1,26 @@ +#-- metatron verifier library +if(NFLX_INTERNAL) + add_library(metatron + "auth_context.pb.cc" + "auth_context.pb.h" + "metatron_config.cc" + "metatron_config.h" + ) +else() + add_library(metatron + "metatron_config.h" + "metatron_sample.cc" + ) +endif() + +target_include_directories(metatron PRIVATE + ${CMAKE_SOURCE_DIR} +) +target_link_libraries(metatron util ${CONAN_LIBS}) + +#-- protobuf generated files must exist in both the SOURCE_DIR and the BINARY_DIR +add_custom_command( + OUTPUT auth_context.pb.cc auth_context.pb.h + COMMAND ${CONAN_PROTOBUF_ROOT}/bin/protoc --proto_path=${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=${CMAKE_CURRENT_SOURCE_DIR} auth_context.proto + COMMAND ${CONAN_PROTOBUF_ROOT}/bin/protoc --proto_path=${CMAKE_CURRENT_SOURCE_DIR} --cpp_out=${CMAKE_CURRENT_BINARY_DIR} auth_context.proto +) diff --git a/metatron/metatron_config.h b/metatron/metatron_config.h new file mode 100644 index 0000000..2922aea --- /dev/null +++ b/metatron/metatron_config.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include + +namespace metatron { + +struct CertInfo { + std::string ssl_cert; + std::string ssl_key; + std::string ca_info; + std::string app_name; +}; + +CertInfo find_certificate(bool external_enabled, const std::string& metatron_dir); + +int metatron_verify_callback(int preverify_ok, X509_STORE_CTX *x509_ctx); + +CURLcode sslctx_metatron_verify(CURL *curl, void *ssl_ctx, void *parm); + +} // namespace metatron diff --git a/metatron/metatron_sample.cc b/metatron/metatron_sample.cc new file mode 100644 index 0000000..ce547bc --- /dev/null +++ b/metatron/metatron_sample.cc @@ -0,0 +1,22 @@ +#include "metatron_config.h" +#include "util/logger.h" + +namespace metatron { + +CertInfo find_certificate(bool /*unused*/, const std::string& /*unused*/) { + return CertInfo{"ssl_cert", "ssl_key", "ca_info", "app_name"}; +} + +int metatron_verify_callback(int /*unused*/, X509_STORE_CTX* /*unused*/) { + static auto logger = spectatord::Logger(); + logger->warn("Always stop the verification process with a verification failed state"); + return 0; +} + +CURLcode sslctx_metatron_verify(CURL* /*unused*/, void* /*unused*/, void* /*unused*/) { + static auto logger = spectatord::Logger(); + logger->warn("Always halt SSL processing with the sample config"); + return CURLE_SSL_CONNECT_ERROR; +} + +} // namespace metatron diff --git a/spectator/CMakeLists.txt b/spectator/CMakeLists.txt index 048fa4f..8e04e98 100644 --- a/spectator/CMakeLists.txt +++ b/spectator/CMakeLists.txt @@ -65,7 +65,7 @@ add_library(spectator OBJECT "version.h" ) target_include_directories(spectator PRIVATE ${CMAKE_SOURCE_DIR}) -target_link_libraries(spectator util ${CONAN_LIBS}) +target_link_libraries(spectator metatron util ${CONAN_LIBS}) #-- file generators, must exist where the outputs are referenced add_custom_command( diff --git a/spectator/config.h b/spectator/config.h index 849a892..50241ea 100644 --- a/spectator/config.h +++ b/spectator/config.h @@ -40,8 +40,11 @@ class Config { size_t age_gauge_limit{}; std::string uri; std::string external_uri; - bool verbose_http = false; + + std::string metatron_dir; + bool external_enabled = false; bool status_metrics_enabled = true; + bool verbose_http = false; // sub-classes can override this method implementing custom logic // that can disable publishing under certain conditions diff --git a/spectator/http_client.cc b/spectator/http_client.cc index c86a3ff..37559b1 100644 --- a/spectator/http_client.cc +++ b/spectator/http_client.cc @@ -28,21 +28,19 @@ class CurlHeaders { namespace { -auto curl_ignore_output_fun(char* /*unused*/, size_t size, size_t nmemb, void * - /*unused*/) -> size_t { +auto curl_ignore_output_fun(char* /*unused*/, size_t size, size_t nmemb, void * /*unused*/) + -> size_t { return size * nmemb; } -auto curl_capture_output_fun(char* contents, size_t size, size_t nmemb, - void* userp) -> size_t { +auto curl_capture_output_fun(char* contents, size_t size, size_t nmemb, void* userp) -> size_t { auto real_size = size * nmemb; auto* resp = static_cast(userp); resp->append(contents, real_size); return real_size; } -auto curl_capture_headers_fun(char* contents, size_t size, size_t nmemb, - void* userp) -> size_t { +auto curl_capture_headers_fun(char* contents, size_t size, size_t nmemb, void* userp) -> size_t { auto real_size = size * nmemb; auto end = contents + real_size; auto* headers = static_cast(userp); @@ -76,11 +74,11 @@ class CurlHandle { curl_easy_cleanup(handle_); } - auto handle() const noexcept -> CURL* { return handle_; } + [[nodiscard]] auto handle() const noexcept -> CURL* { return handle_; } - auto perform() -> CURLcode { return curl_easy_perform(handle()); } + [[nodiscard]] auto perform() const -> CURLcode { return curl_easy_perform(handle()); } - auto set_opt(CURLoption option, const void* param) -> CURLcode { + auto set_opt(CURLoption option, const void* param) const -> CURLcode { return curl_easy_setopt(handle(), option, param); } @@ -99,7 +97,7 @@ class CurlHandle { void move_headers(HttpHeaders* out) { *out = std::move(resp_headers_); } - void set_url(const std::string& url) { set_opt(CURLOPT_URL, url.c_str()); } + void set_url(const std::string& url) const { set_opt(CURLOPT_URL, url.c_str()); } void set_headers(std::shared_ptr headers) { headers_ = std::move(headers); @@ -127,19 +125,35 @@ class CurlHandle { curl_easy_setopt(handle_, CURLOPT_CUSTOMREQUEST, method); } + void configure_metatron(const HttpClientConfig& config) { + // provide metatron client certificate during handshake + curl_easy_setopt(handle_, CURLOPT_SSLCERT, config.cert_info.ssl_cert.c_str()); + curl_easy_setopt(handle_, CURLOPT_SSLKEY, config.cert_info.ssl_key.c_str()); + // disable use of system CAs + curl_easy_setopt(handle_, CURLOPT_CAPATH, NULL); + // perform full trust verification of server based on metatron CAs + curl_easy_setopt(handle_, CURLOPT_SSL_VERIFYPEER, 1); + curl_easy_setopt(handle_, CURLOPT_CAINFO, config.cert_info.ca_info.c_str()); + // install Metatron verifier and provide application name to check + curl_easy_setopt(handle_, CURLOPT_SSL_CTX_FUNCTION, metatron::sslctx_metatron_verify); + curl_easy_setopt(handle_, CURLOPT_SSL_CTX_DATA, config.cert_info.app_name.c_str()); + // disable hostname verification, SANs not present in metatron certs + curl_easy_setopt(handle_, CURLOPT_SSL_VERIFYHOST, 0); + // save any extended error message + curl_easy_setopt(handle_, CURLOPT_ERRORBUFFER, errbuf_); + } + void ignore_output() { curl_easy_setopt(handle_, CURLOPT_WRITEFUNCTION, curl_ignore_output_fun); } void capture_output() { curl_easy_setopt(handle_, CURLOPT_WRITEFUNCTION, curl_capture_output_fun); - curl_easy_setopt(handle_, CURLOPT_WRITEDATA, - static_cast(&response_)); + curl_easy_setopt(handle_, CURLOPT_WRITEDATA, static_cast(&response_)); } void capture_headers() { - curl_easy_setopt(handle_, CURLOPT_HEADERDATA, - static_cast(&resp_headers_)); + curl_easy_setopt(handle_, CURLOPT_HEADERDATA, static_cast(&resp_headers_)); curl_easy_setopt(handle_, CURLOPT_HEADERFUNCTION, curl_capture_headers_fun); } @@ -150,38 +164,41 @@ class CurlHandle { curl_easy_setopt(handle_, CURLOPT_VERBOSE, 1L); } + char* get_errbuf() { + return errbuf_; + } + private: CURL* handle_; std::shared_ptr headers_; const void* payload_ = nullptr; std::string response_; HttpHeaders resp_headers_; + char errbuf_[CURL_ERROR_SIZE]{}; }; } // namespace HttpClient::HttpClient(Registry* registry, HttpClientConfig config) - : registry_(registry), config_{config} {} + : registry_(registry), config_{std::move(config)} {} auto HttpClient::Get(const std::string& url) const -> HttpResponse { return perform("GET", url, std::make_shared(), nullptr, 0u, 0); } -auto HttpClient::Get(const std::string& url, - const std::vector& headers) const - -> HttpResponse { +auto HttpClient::Get(const std::string& url, const std::vector& headers) + const -> HttpResponse { return method_header("GET", url, headers); } -auto HttpClient::Put(const std::string& url, - const std::vector& headers) const - -> HttpResponse { +auto HttpClient::Put(const std::string& url, const std::vector& headers) + const -> HttpResponse { return method_header("PUT", url, headers); } auto HttpClient::method_header(const char* method, const std::string& url, - const std::vector& headers) const - -> HttpResponse { + const std::vector& headers) + const -> HttpResponse { auto curl_headers = std::make_shared(); for (const auto& h : headers) { curl_headers->append(h); @@ -207,30 +224,40 @@ auto HttpClient::perform(const char* method, const std::string& url, auto logger = registry_->GetLogger(); curl.set_url(url); curl.set_headers(headers); + if (strcmp("POST", method) == 0) { curl.post_payload(payload, size); } else if (strcmp("GET", method) != 0) { curl.custom_request(method); } + if (config_.external_enabled) { + curl.configure_metatron(config_); + } if (config_.read_body) { curl.capture_output(); } else { curl.ignore_output(); } - if (config_.read_headers) { curl.capture_headers(); } - if (config_.verbose_requests) { curl.trace_requests(); } + auto curl_res = curl.perform(); int http_code; if (curl_res != CURLE_OK) { - logger->error("Failed to {} {}: {}", method, url, - curl_easy_strerror(curl_res)); + auto errbuff = curl.get_errbuf(); + if (errbuff[0] == '\0') { + logger->error("Failed to {} {}: {}", + method, url, curl_easy_strerror(curl_res)); + } else { + logger->error("Failed to {} {}: {} (errbuf={})", + method, url, curl_easy_strerror(curl_res), errbuff); + } + switch (curl_res) { case CURLE_COULDNT_CONNECT: entry.set_error("connection_error"); @@ -241,6 +268,7 @@ auto HttpClient::perform(const char* method, const std::string& url, default: entry.set_error("unknown"); } + auto elapsed = absl::Now() - entry.start(); // retry connect timeouts if possible, not read timeouts logger->info( @@ -248,11 +276,11 @@ auto HttpClient::perform(const char* method, const std::string& url, absl::ToInt64Milliseconds(elapsed), absl::ToInt64Milliseconds(config_.connect_timeout), absl::ToInt64Milliseconds(total_timeout - config_.connect_timeout)); + if (elapsed < total_timeout && attempt_number < 2) { entry.set_attempt(attempt_number, false); entry.log(config_.status_metrics_enabled); - return perform(method, url, std::move(headers), payload, size, - attempt_number + 1); + return perform(method, url, std::move(headers), payload, size, attempt_number + 1); } http_code = -1; @@ -260,11 +288,13 @@ auto HttpClient::perform(const char* method, const std::string& url, } else { http_code = curl.status_code(); entry.set_status_code(http_code); + if (http_code / 100 == 2) { entry.set_success(); } else { entry.set_error("http_error"); } + if (is_retryable_error(http_code) && attempt_number < 2) { logger->info("Got a retryable http code from {}: {} (attempt {})", url, http_code, attempt_number); @@ -272,8 +302,7 @@ auto HttpClient::perform(const char* method, const std::string& url, entry.log(config_.status_metrics_enabled); auto sleep_ms = uint32_t(200) << attempt_number; // 200, 400ms std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); - return perform(method, url, std::move(headers), payload, size, - attempt_number + 1); + return perform(method, url, std::move(headers), payload, size, attempt_number + 1); } logger->debug("{} {} - status code: {}", method, url, http_code); } @@ -296,8 +325,7 @@ auto HttpClient::Post(const std::string& url, const char* content_type, headers->append(content_type); headers->append(kGzipEncoding); - return perform("POST", url, std::move(headers), payload.data, payload.size, - 0); + return perform("POST", url, std::move(headers), payload.data, payload.size, 0); } auto HttpClient::Post(const std::string& url, const char* content_type, @@ -305,25 +333,24 @@ auto HttpClient::Post(const std::string& url, const char* content_type, auto logger = registry_->GetLogger(); auto headers = std::make_shared(); headers->append(content_type); + if (config_.compress) { headers->append(kGzipEncoding); auto compressed_size = compressBound(size) + kGzipHeaderSize; - auto compressed_payload = - std::unique_ptr(new char[compressed_size]); + auto compressed_payload = std::unique_ptr(new char[compressed_size]); auto compress_res = gzip_compress(compressed_payload.get(), &compressed_size, payload, size); + if (compress_res != Z_OK) { logger->error( - "Failed to compress payload: {}, while posting to {} - uncompressed " - "size: {}", + "Failed to compress payload: {}, while posting to {} - uncompressed size: {}", compress_res, url, size); HttpResponse err{}; err.status = -1; return err; } - return perform("POST", url, std::move(headers), compressed_payload.get(), - compressed_size, 0); + return perform("POST", url, std::move(headers), compressed_payload.get(), compressed_size, 0); } // no compression diff --git a/spectator/http_client.h b/spectator/http_client.h index e578394..01d3ac5 100644 --- a/spectator/http_client.h +++ b/spectator/http_client.h @@ -2,6 +2,7 @@ #include "absl/time/time.h" #include "compressed_buffer.h" +#include "metatron/metatron_config.h" #include #include #include @@ -22,6 +23,8 @@ struct HttpClientConfig { bool read_body; bool verbose_requests; bool status_metrics_enabled; + bool external_enabled; + metatron::CertInfo cert_info; }; using HttpHeaders = std::unordered_map; @@ -53,12 +56,10 @@ class HttpClient { [[nodiscard]] auto Get(const std::string& url) const -> HttpResponse; [[nodiscard]] auto Get(const std::string& url, - const std::vector& headers) const - -> HttpResponse; + const std::vector& headers) const -> HttpResponse; [[nodiscard]] auto Put(const std::string& url, - const std::vector& headers) const - -> HttpResponse; + const std::vector& headers) const -> HttpResponse; static void GlobalInit() noexcept; static void GlobalShutdown() noexcept; @@ -72,8 +73,7 @@ class HttpClient { size_t size, int attempt_number) const -> HttpResponse; auto method_header(const char* method, const std::string& url, - const std::vector& headers) const - -> HttpResponse; + const std::vector& headers) const -> HttpResponse; }; } // namespace spectator diff --git a/spectator/http_client_test.cc b/spectator/http_client_test.cc index 4afd8b2..0151e45 100644 --- a/spectator/http_client_test.cc +++ b/spectator/http_client_test.cc @@ -49,8 +49,9 @@ class TestRegistry : public Registry { }; HttpClientConfig get_cfg(int read_to, int connect_to) { + auto cert_info = metatron::CertInfo{"ssl_cert", "ssl_key", "ca_info", "app_name"}; return HttpClientConfig{absl::Milliseconds(connect_to), absl::Milliseconds(read_to), - true, true, true, false, true}; + true, true, true, false, true, false, cert_info}; } TEST(HttpTest, Post) { diff --git a/spectator/publisher.h b/spectator/publisher.h index 2595249..eead721 100644 --- a/spectator/publisher.h +++ b/spectator/publisher.h @@ -8,6 +8,7 @@ #include "http_client.h" #include "util/logger.h" #include "measurement.h" +#include "metatron/metatron_config.h" #include "smile.h" #include @@ -262,8 +263,10 @@ class Publisher { if (connect_timeout == absl::ZeroDuration()) { connect_timeout = absl::Seconds(2); } - return HttpClientConfig{connect_timeout, read_timeout, true, false, true, cfg.verbose_http, - cfg.status_metrics_enabled}; + auto cert_info = metatron::find_certificate(cfg.external_enabled, cfg.metatron_dir); + return HttpClientConfig{connect_timeout, read_timeout, true, false, true, + cfg.verbose_http, cfg.status_metrics_enabled, + cfg.external_enabled, cert_info}; } auto handle_aggr_response(const HttpResponse& http_response, @@ -331,8 +334,7 @@ class Publisher { auto http_cfg = get_http_config(cfg); auto start = absl::Now(); HttpClient client{registry_, std::move(http_cfg)}; - auto batch_size = - static_cast::difference_type>(cfg.batch_size); + auto batch_size = static_cast::difference_type>(cfg.batch_size); auto measurements = registry_->Measurements(); if (!cfg.is_enabled() || measurements.empty()) { @@ -343,12 +345,13 @@ class Publisher { } if (logger->should_log(spdlog::level::trace)) { - logger->trace("Sending {} measurements to {}", measurements.size(), - registry_->GetConfig().uri); + logger->trace("Sending {} measurements to {}", + measurements.size(), registry_->GetConfig().uri); for (const auto& m : measurements) { logger->trace("{}", m); } } + auto from = measurements.begin(); auto end = measurements.end(); std::vector> responses; @@ -359,9 +362,7 @@ class Publisher { std::transform(buffers_.begin(), buffers_.end(), std::back_inserter(avail_buffers), [](auto& b) { return &b; }); - std::vector< - std::pair> - batches; + std::vector> batches; while (from != end) { auto to_end = std::distance(from, end); @@ -385,8 +386,7 @@ class Publisher { } measurements_to_json(payload, batch.first, batch.second); - auto response = - client.Post(uri, HttpClient::kSmileJson, payload->Result()); + auto response = client.Post(uri, HttpClient::kSmileJson, payload->Result()); { absl::MutexLock lock(&responses_mutex); auto batch_size = batch.second - batch.first;