Skip to content

Commit

Permalink
HPCC-29332 WIP
Browse files Browse the repository at this point in the history
- Removes system/tracing/tracemanager

Signed-off-by: Rodrigo Pastrana <[email protected]>
  • Loading branch information
rpastrana committed Jul 30, 2023
1 parent 5451509 commit 0fee5c2
Show file tree
Hide file tree
Showing 15 changed files with 902 additions and 521 deletions.
38 changes: 37 additions & 1 deletion esp/bindings/http/platform/httpservice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,34 @@ void checkSetCORSAllowOrigin(EspHttpBinding *binding, CHttpRequest *req, CHttpRe

int CEspHttpServer::processRequest()
{
TraceManager traceManager("esp"); //we'd use an appropriate module/lib name here
auto tracer = traceManager.getTracer();

//Extract parent(caller) context from http header, likely done earlier in the process
//We'd need a setParentContextFromHeaders version supporting httptransport's StringArray m_headers;
//TraceManager::setParentContextFromHeaders(const_cast<std::map<std::string, std::string> &>(request.headers), options);
//or labda function to extract parent context from http header

//Options used to annotate span representing the processing of http requests
opentelemetry::trace::StartSpanOptions options;
options.kind = opentelemetry::trace::SpanKind::kServer;

//Declare the span, provide appropriate attributes, and options
//Trace ID generated if no parent context is provided
auto processingRequestSpan =
tracer->StartSpan("ProcessingHTTPRequest",
{ //Declare whatever span attributes we have at this point
//More can be attached along the way
//{"stype", stype}, //span attributes
{opentelemetry::trace::SemanticConventions::kNetHostPort, "8010"},
//{opentelemetry::trace::SemanticConventions::kHttpMethod, methodName.str()},
//{opentelemetry::trace::SemanticConventions::kRpcService, serviceName.str()},
{opentelemetry::trace::SemanticConventions::kHttpScheme, "http"}},
options); //options.parent is set as parent context for current span

//activate the span
auto scope = tracer->WithActiveSpan(processingRequestSpan);

IEspContext* ctx = m_request->queryContext();
StringBuffer errMessage;
m_request->setPersistentEnabled(m_apport->queryProtocol()->persistentEnabled() && !shouldClose);
Expand Down Expand Up @@ -234,11 +262,14 @@ int CEspHttpServer::processRequest()
StringBuffer serviceName;
StringBuffer methodName;
m_request->getEspPathInfo(stype, &pathEx, &serviceName, &methodName, false);

ESPLOG(LogNormal,"sub service type: %s. parm: %s", getSubServiceDesc(stype), m_request->queryParamStr());

//all thesee attributes could/should be tracked by opentel trace/spans
m_request->updateContext();
ctx->setServiceName(serviceName.str());
ctx->setHTTPMethod(method.str());
processingRequestSpan->SetAttribute(opentelemetry::trace::SemanticConventions::kHttpMethod, method.str());
ctx->setServiceMethod(methodName.str());
ctx->addTraceSummaryValue(LogMin, "app.protocol", method.str(), TXSUMMARY_GRP_ENTERPRISE);
ctx->addTraceSummaryValue(LogMin, "app.service", serviceName.str(), TXSUMMARY_GRP_ENTERPRISE);
Expand All @@ -257,6 +288,7 @@ int CEspHttpServer::processRequest()
}
ctx->addTraceSummaryValue(LogMin, "custom_fields.URL", url.str(), TXSUMMARY_GRP_ENTERPRISE);

//TraceManager::injectKeyValue(C & carrier, const char * key, const char * val)
m_response->setHeader(HTTP_HEADER_HPCC_GLOBAL_ID, ctx->getGlobalId());

if(strieq(method.str(), OPTIONS_METHOD))
Expand All @@ -268,7 +300,8 @@ int CEspHttpServer::processRequest()
ESPLOG(LogMin, "%s %s, from %s", method.str(), m_request->getPath(pathStr).str(), m_request->getPeer(peerStr).str());
else //user ID is in HTTP header
ESPLOG(LogMin, "%s %s, from %s@%s", method.str(), m_request->getPath(pathStr).str(), userid, m_request->getPeer(peerStr).str());

//checkUserAuth could declare nested span
//and/or declare this as an event on the processingRequestSpan
authState = checkUserAuth();
if ((authState == authTaskDone) || (authState == authFailed))
return 0;
Expand Down Expand Up @@ -450,6 +483,9 @@ int CEspHttpServer::processRequest()
return 0;
}

//need to ensure that the span is ended when out of scope Owend<ISpan> processingRequestSpan?
processingRequestSpan->End();

return 0;
}

Expand Down
1 change: 1 addition & 0 deletions esp/bindings/http/platform/httpservice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "espsession.ipp"
#include "jhash.hpp"
#include "jtrace.hpp"

typedef enum espAuthState_
{
Expand Down
18 changes: 18 additions & 0 deletions esp/platform/espp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "jliball.hpp"
#include "jstats.h"
#include "jutil.hpp"
#include "jtrace.hpp"

//CRT / OS
#ifndef _WIN32
Expand Down Expand Up @@ -426,6 +427,11 @@ int init_main(int argc, const char* argv[])
//save off generated config to register with container. Legacy can always reference the config file, application based ESP needs generated config saved off
Owned<IPropertyTree> appConfig;

TraceManager traceManager("esp");
auto espTracer = traceManager.getTracer();//All instances of this esp process can share the same tracer same as TraceManager::getTracer("esp")
auto rootEspProcspan = espTracer->StartSpan(__func__); //Dummy top level span for the esp process
auto scope = espTracer->WithActiveSpan(rootEspProcspan); //Set the active span. The span will remain active until the returned Scope object is destroyed.

try
{
const char* cfgfile = NULL;
Expand Down Expand Up @@ -483,6 +489,16 @@ int init_main(int argc, const char* argv[])
const char * processName = procpt->queryProp("@name");
setStatisticsComponentName(SCTesp, processName, true);

rootEspProcspan->SetAttribute("procname", processName); //Span can be annotated with attributes
rootEspProcspan->SetAttribute("config", cfgfile);//Span can be annotated with attributes
//rootEspProcspan->SetAttribute("application", application);

std::string traceId;
traceManager.getCurrentTraceId(traceId);

std::string spanId;
traceManager.getCurrentSpanID(spanId);

openEspLogFile(envpt.get(), procpt.get());

DBGLOG("Esp starting %s", hpccBuildInfo.buildTag);
Expand Down Expand Up @@ -511,6 +527,8 @@ int init_main(int argc, const char* argv[])
config.setown(cfg);
abortHandler.setConfig(cfg);
}

rootEspProcspan->End();
}
catch(IException* e)
{
Expand Down
23 changes: 23 additions & 0 deletions system/jlib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ endif(NOT TARGET libbase58)

find_package(yaml CONFIG REQUIRED)

#For http exporter
find_package(Protobuf REQUIRED)
#For http exporter
find_package(CURL REQUIRED)

find_package(opentelemetry-cpp CONFIG REQUIRED)

#add_subdirectory(${HPCC_SOURCE_DIR}/system/tracing/opentel/exporters/jlog ${CMAKE_BINARY_DIR}/system/tracing/opentel/exporters/jlog)

SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${STRICT_CXX_FLAGS}")

set ( SRCS
Expand Down Expand Up @@ -184,6 +193,7 @@ set ( INCLUDES
${HPCC_SOURCE_DIR}/system/security/cryptohelper/digisign.cpp
${HPCC_SOURCE_DIR}/system/security/cryptohelper/pke.cpp
${HPCC_SOURCE_DIR}/system/security/cryptohelper/ske.cpp
#${HPCC_SOURCE_DIR}/system/tracing/tracemanager.hpp
)

set_source_files_properties(jmd5.cpp jsort.cpp PROPERTIES SKIP_UNITY_BUILD_INCLUSION ON)
Expand All @@ -207,6 +217,8 @@ include_directories (
${HPCC_SOURCE_DIR}/system/security/shared
${HPCC_SOURCE_DIR}/system/security/cryptohelper
${HPCC_SOURCE_DIR}/system/httplib
#${HPCC_SOURCE_DIR}/system/tracing
${OPENTELEMETRY_CPP_INCLUDE_DIRS}
${CMAKE_CURRENT_BINARY_DIR} # for generated jelog.h file
${CMAKE_BINARY_DIR}
${CMAKE_BINARY_DIR}/oss
Expand All @@ -230,6 +242,17 @@ if ( ${HAVE_LIBCRYPT} )
target_link_libraries ( jlib crypt)
endif ( ${HAVE_LIBCRYPT} )

target_link_libraries ( jlib
opentelemetry-cpp::api
opentelemetry-cpp::ext
opentelemetry-cpp::sdk
opentelemetry-cpp::logs #might not need
opentelemetry-cpp::ostream_span_exporter
opentelemetry-cpp::metrics #might not need
opentelemetry-cpp::otlp_http_exporter #might not need
opentelemetry-cpp::http_client_curl #might not need
)

IF (USE_OPENSSL)
target_link_libraries ( jlib ${OPENSSL_LIBRARIES})
endif (USE_OPENSSL)
Expand Down
193 changes: 192 additions & 1 deletion system/jlib/jtrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
limitations under the License.
############################################################################## */


#include "platform.h"

#include "jmisc.hpp"
Expand Down Expand Up @@ -44,6 +43,11 @@ LogTrace::LogTrace(const char * globalId)
setGlobalId(globalId);
}

LogTrace::LogTrace()
{
assignLocalId();
}

const char* LogTrace::assignLocalId()
{
localId.set(createUniqueIdString().c_str());
Expand All @@ -69,3 +73,190 @@ const char* LogTrace::queryLocalId() const
{
return localId.get();
}

void TraceManager::initTracer()
{
//Handle HPCC specific tracing configuration here
//Target exporter? exporter connection info?
//Target processor(s)? batch/interactive? prob not
//Target propogator? http? grpc? binary? custom?
//HPCC component tracing switches?
Owned<const IPropertyTree> traceConfig;
try
{
traceConfig.setown(getComponentConfigSP()->getPropTree("tracing"));
}
catch (IException * e)
{
EXCLOG(e);
e->Release();
}

if (traceConfig)
{
Owned<IPropertyTree> exportConfig = traceConfig->getPropTree("exporter");
if (exportConfig)
{
if (exportConfig->getPropBool("OS", false)) //To stdout/err
DBGLOG("Tracing to stdout/err");
else if (exportConfig->getPropBool("OTLP", false))
DBGLOG("Tracing to OTLP");
else if (exportConfig->getPropBool("Jaeger", false))
DBGLOG("Tracing to Jaeger");
else if (exportConfig->getPropBool("Zipkin", false))
DBGLOG("Tracing to Zipkin");
else if (exportConfig->getPropBool("Prometheus", false))
DBGLOG("Tracing to Prometheus");
else if (exportConfig->getPropBool("HPCC", false))
DBGLOG("Tracing to HPCC JLog");
}
}
else
{
using namespace opentelemetry::trace;
//OStream exporter, useful for development and debugging tasks and simplest to set up.
auto exporter = opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create();

//SimpleSpanProcesser sends spans one by one to an exporter.
//We could use a batchspanprocessor, which will group several spans together, before sending them to an exporter.
auto processor = opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter));
std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor>> processors;
processors.push_back(std::move(processor));

// Default is an always-on sampler.
std::shared_ptr<opentelemetry::sdk::trace::TracerContext> context =
opentelemetry::sdk::trace::TracerContextFactory::Create(std::move(processors));
std::shared_ptr<opentelemetry::trace::TracerProvider> provider =
opentelemetry::sdk::trace::TracerProviderFactory::Create(context);

// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);

// set global propagator
// Injects Context into and extracts it from carriers that travel in-band
// across process boundaries. Encoding is expected to conform to the HTTP
// Header Field semantics.
// Values are often encoded as RPC/HTTP request headers.
opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(
opentelemetry::nostd::shared_ptr<opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));
}
}

void TraceManager::cleanupTracer()
{
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
opentelemetry::trace::Provider::SetTracerProvider(none);
}

//convenience non-static method to get the default tracer, uses stored tracer/module name
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> TraceManager::getTracer()
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
return provider->GetTracer(tracerName); // (library_name [,library_version][,schema_url])
}

//convenience Static method to get the default tracer, uses provided module name
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> TraceManager::getTracer(std::string moduleName)
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
return provider->GetTracer(moduleName); // (library_name [,library_version][,schema_url])
}

void TraceManager::getCallerSpanId(context::propagation::TextMapCarrier &carrier, std::string & callerSpanId)
{
callerSpanId.clear();

// Inject current context into http header
auto currentCtx = context::RuntimeContext::GetCurrent();
auto propagator = context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
propagator->Inject(carrier, currentCtx);

// Extract parent span context from the TextMapCarrier
auto parentSpanContext = propagator->Extract(carrier,currentCtx);

// Get the value of the HPCC-Caller-Id header from the TextMapCarrier
auto callerIdHeader = carrier.Get("traceparent");

callerSpanId = callerIdHeader.data();
}

void TraceManager::getParentSpanId(std::map<std::string, std::string> requestHeaders, std::string & callerSpanId)
{
const HttpTextMapCarrier<std::map<std::string, std::string>> carrier(requestHeaders);
TraceManager::getParentSpanId(carrier, callerSpanId);
}

void TraceManager::getParentSpanId(const HttpTextMapCarrier<std::map<std::string, std::string>> carrier, std::string & callerSpanId)
{
callerSpanId.clear();

auto propagator = context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
auto current_ctx = context::RuntimeContext::GetCurrent();
auto new_context = propagator->Extract(carrier, current_ctx);
auto parentSpan = opentel_trace::GetSpan(new_context)->GetContext();

char span_id[16] = {0};
parentSpan.span_id().ToLowerBase16(span_id);
callerSpanId = std::string(span_id, 16).c_str();
}

void TraceManager::getCurrentTraceId(std::string & traceId) const
{
traceId.clear();

nostd::shared_ptr<opentel_trace::Span> currntSpan =
TraceManager::getTracer(std::string(tracerName))->GetCurrentSpan();

if (currntSpan->IsRecording())
{
auto spanCtx = currntSpan->GetContext();

char trace_id[32] = {0};
spanCtx.trace_id().ToLowerBase16(trace_id);
traceId = std::string(trace_id, 32);
}
}
void TraceManager::getParentContext(std::map<std::string, std::string>& request_headers, opentel_trace::StartSpanOptions & options)
{
const HttpTextMapCarrier<std::map<std::string, std::string>> carrier(request_headers);
auto prop = context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
auto current_ctx = context::RuntimeContext::GetCurrent();
auto new_context = prop->Extract(carrier, current_ctx);
options.parent = opentelemetry::trace::GetSpan(new_context)->GetContext();
}

void TraceManager::getCurrentSpanID(std::string & spanId) const
{
spanId.clear();
nostd::shared_ptr<opentel_trace::Span> currntSpan =
TraceManager::getTracer(std::string(tracerName))->GetCurrentSpan();

if (currntSpan->IsRecording())
{
char span_id[16] = {0};
currntSpan->GetContext().span_id().ToLowerBase16(span_id);

spanId = std::string(span_id, 16);
}
}



static Singleton<TraceManager> traceManager;
MODULE_INIT(INIT_PRIORITY_STANDARD)
{
TraceManager::initTracer(); //Initialize the tracer based on HPCC configuration
return true;
}

MODULE_EXIT()
{
traceManager.destroy();
}

//Name of module/library that is being traced/instrumented
TraceManager * queryTraceManager(const char * moduleName)
{
return new TraceManager(moduleName);
}
Loading

0 comments on commit 0fee5c2

Please sign in to comment.