Skip to content

Commit

Permalink
Ignore exception when create thread in OTLP file exporter.
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Jul 25, 2024
1 parent deed1e3 commit 1075014
Showing 1 changed file with 73 additions and 50 deletions.
123 changes: 73 additions & 50 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep
// clang-format on

#include "opentelemetry/common/macros.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/base64.h"
Expand Down Expand Up @@ -47,6 +48,9 @@
#include <thread>
#include <utility>
#include <vector>
#if OPENTELEMETRY_HAVE_EXCEPTIONS
# include <exception>
#endif

#if !defined(__CYGWIN__) && defined(_WIN32)
# ifndef WIN32_LEAN_AND_MEAN
Expand Down Expand Up @@ -1424,71 +1428,90 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
return;
}

std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
#if OPENTELEMETRY_HAVE_EXCEPTIONS
try
{
return;
}

std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;
#endif

while (true)
std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
{
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}
return;
}

if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
break;
}
std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;

while (true)
{
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}

{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
break;
}

if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}

concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}
{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
}

concurrency_file->background_thread_waiter_cv.notify_all();
}
if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}

concurrency_file->background_thread_waiter_cv.notify_all();
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
#if OPENTELEMETRY_HAVE_EXCEPTIONS
}
catch (std::exception &e)
{
OTEL_INTERNAL_LOG_WARN("[OTLP FILE Client] Try to spawn background but got a exception: "
<< e.what() << ".Data writing may experience some delays.");
}
catch (...)
{
OTEL_INTERNAL_LOG_WARN(
"[OTLP FILE Client] Try to spawn background but got a unknown exception.Data writing may "
"experience some delays.");
}
#endif
}

private:
Expand Down

0 comments on commit 1075014

Please sign in to comment.