diff --git a/esp/bindings/http/platform/httpservice.cpp b/esp/bindings/http/platform/httpservice.cpp index ea3cbb1f3e2..7d5919b7fde 100644 --- a/esp/bindings/http/platform/httpservice.cpp +++ b/esp/bindings/http/platform/httpservice.cpp @@ -184,6 +184,48 @@ void checkSetCORSAllowOrigin(EspHttpBinding *binding, CHttpRequest *req, CHttpRe int CEspHttpServer::processRequest() { + //Mock http headers from request + Owned mockHTTPHeaders = createProperties(); + + //The traceparent header uses the version-trace_id-parent_id-trace_flags format where: + //version is always 00. trace_id is a hex-encoded trace id. span_id is a hex-encoded span id. trace_flags is a hex-encoded 8-bit field that contains tracing flags such as sampling, trace level, etc. + mockHTTPHeaders->setProp(/*opentelemetry::trace::propagation::kTraceParent*/"traceparent", "00-beca49ca8f3138a2842e5cf21402bfff-4b960b3e4647da3f-01"); + mockHTTPHeaders->setProp(/*opentelemetry::trace::propagation::kTraceState*/"tracestate", "IncomingUGID"); //opentelemetry::trace::propagation::kTraceState + mockHTTPHeaders->setProp(HPCCSemanticConventions::kGLOBALIDHTTPHeader, "IncomingUGID"); + mockHTTPHeaders->setProp(HPCCSemanticConventions::kCallerIdHTTPHeader, "IncomingCID"); + + StringArray mockHTTPHeadersSA; + //mock opentel traceparent context + mockHTTPHeadersSA.append("traceparent:00-beca49ca8f3138a2842e5cf21402bfff-4b960b3e4647da3f-01"); + //mock opentel tracestate https://www.w3.org/TR/trace-context/#trace-context-http-headers-format + mockHTTPHeadersSA.append("tracestate:hpcc=4b960b3e4647da3f"); + mockHTTPHeadersSA.append("HPCC-Global-Id:someGlobalID"); + mockHTTPHeadersSA.append("HPCC-Caller-Id:IncomingCID"); + + Owned serverSpan = queryTraceManager().createServerSpan("rootProcessingHTTPRequest", mockHTTPHeadersSA); + serverSpan->setSpanAttribute("http.request_port", "8010"); + serverSpan->setSpanAttribute("app.name", "esp"); + serverSpan->setSpanAttribute("app.version", "1.0.0"); + serverSpan->setSpanAttribute("app.instance", "esp1"); + serverSpan->setSpanAttribute("http.method", m_request->queryMethod()); + serverSpan->setSpanAttribute("http.url", m_request->queryPath()); + serverSpan->setSpanAttribute("http.host", m_request->queryHost()); + + { + //Mock http headers from request + Owned mockClientContext = createProperties(); + serverSpan->createClientSpan("MockExternalCall")->injectSpanContext(mockClientContext); + + // Print out mockClientContext + Owned iter(mockClientContext->getIterator()); + for(iter->first(); iter->isValid(); iter->next()) + { + const char* key = iter->getPropKey(); + const char* value = mockClientContext->queryProp(key); + DBGLOG("MockClientContext: %s=%s", key, value); + } + } + IEspContext* ctx = m_request->queryContext(); StringBuffer errMessage; m_request->setPersistentEnabled(m_apport->queryProtocol()->persistentEnabled() && !shouldClose); @@ -236,9 +278,12 @@ int CEspHttpServer::processRequest() m_request->getEspPathInfo(stype, &pathEx, &serviceName, &methodName, false); ESPLOG(LogNormal,"sub service type: %s. parm: %s", getSubServiceDesc(stype), m_request->queryParamStr()); +//all thesee attributes could/should be tracked by opentel trace/spans m_request->updateContext(); ctx->setServiceName(serviceName.str()); ctx->setHTTPMethod(method.str()); +//remove before checkin + serverSpan->setSpanAttribute("http.method", method.str()); ctx->setServiceMethod(methodName.str()); ctx->addTraceSummaryValue(LogMin, "app.protocol", method.str(), TXSUMMARY_GRP_ENTERPRISE); ctx->addTraceSummaryValue(LogMin, "app.service", serviceName.str(), TXSUMMARY_GRP_ENTERPRISE); @@ -268,8 +313,16 @@ int CEspHttpServer::processRequest() ESPLOG(LogMin, "%s %s, from %s", method.str(), m_request->getPath(pathStr).str(), m_request->getPeer(peerStr).str()); else //user ID is in HTTP header ESPLOG(LogMin, "%s %s, from %s@%s", method.str(), m_request->getPath(pathStr).str(), userid, m_request->getPeer(peerStr).str()); - +//checkUserAuth could declare nested span +//and/or declare this as an event on reqProcessSpan +{ + Owned userAuthSpan = serverSpan->createInternalSpan("userAuthSpan"); authState = checkUserAuth(); + + StringBuffer authStateStr; + userAuthSpan->toString(authStateStr); + fprintf(stdout, "%s\n", authStateStr.str()); +} if ((authState == authTaskDone) || (authState == authFailed)) return 0; diff --git a/system/CMakeLists.txt b/system/CMakeLists.txt index e9c18182a4b..1aab283f26d 100644 --- a/system/CMakeLists.txt +++ b/system/CMakeLists.txt @@ -26,7 +26,7 @@ if (NOT JLIB_ONLY) HPCC_ADD_SUBDIRECTORY (xmllib) HPCC_ADD_SUBDIRECTORY (xmllibtest "PLATFORM") HPCC_ADD_SUBDIRECTORY (masking) - + if (USE_AERON) project (aeron_include) SET(CMAKE_UNITY_BUILD FALSE) diff --git a/system/jlib/CMakeLists.txt b/system/jlib/CMakeLists.txt index e6ed9160474..b36d9aa00b4 100644 --- a/system/jlib/CMakeLists.txt +++ b/system/jlib/CMakeLists.txt @@ -42,6 +42,12 @@ endif(NOT TARGET libbase58) find_package(yaml CONFIG REQUIRED) +#For OpenTel exporter +find_package(Protobuf REQUIRED) +#For http exporter +#find_package(CURL REQUIRED) +find_package(opentelemetry-cpp CONFIG REQUIRED) + SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${STRICT_CXX_FLAGS}") set ( SRCS @@ -207,6 +213,7 @@ include_directories ( ${HPCC_SOURCE_DIR}/system/security/shared ${HPCC_SOURCE_DIR}/system/security/cryptohelper ${HPCC_SOURCE_DIR}/system/httplib + ${OPENTELEMETRY_CPP_INCLUDE_DIRS} ${CMAKE_CURRENT_BINARY_DIR} # for generated jelog.h file ${CMAKE_BINARY_DIR} ${CMAKE_BINARY_DIR}/oss @@ -230,6 +237,42 @@ if ( ${HAVE_LIBCRYPT} ) target_link_libraries ( jlib crypt) endif ( ${HAVE_LIBCRYPT} ) +find_package(gRPC CONFIG REQUIRED) + +target_link_libraries ( jlib + opentelemetry-cpp::api + opentelemetry-cpp::ext + opentelemetry-cpp::sdk + opentelemetry-cpp::ostream_span_exporter + # opentelemetry-cpp::api - Imported target of opentelemetry-cpp::api + # opentelemetry-cpp::sdk - Imported target of opentelemetry-cpp::sdk + # opentelemetry-cpp::ext - Imported target of opentelemetry-cpp::ext + # opentelemetry-cpp::version - Imported target of opentelemetry-cpp::version + # opentelemetry-cpp::common - Imported target of opentelemetry-cpp::common + # opentelemetry-cpp::trace - Imported target of opentelemetry-cpp::trace + # opentelemetry-cpp::metrics - Imported target of opentelemetry-cpp::metrics + opentelemetry-cpp::logs # - Imported target of opentelemetry-cpp::logs + opentelemetry-cpp::in_memory_span_exporter # - Imported target of opentelemetry-cpp::in_memory_span_exporter + opentelemetry-cpp::otlp_grpc_client # - Imported target of opentelemetry-cpp::otlp_grpc_client + # opentelemetry-cpp::otlp_recordable - Imported target of opentelemetry-cpp::otlp_recordable + opentelemetry-cpp::otlp_grpc_exporter # - Imported target of opentelemetry-cpp::otlp_grpc_exporter + # opentelemetry-cpp::otlp_grpc_log_record_exporter - Imported target of opentelemetry-cpp::otlp_grpc_log_record_exporter + # opentelemetry-cpp::otlp_grpc_metrics_exporter - Imported target of opentelemetry-cpp::otlp_grpc_metrics_exporter + # opentelemetry-cpp::otlp_http_client - Imported target of opentelemetry-cpp::otlp_http_client + # opentelemetry-cpp::otlp_http_exporter - Imported target of opentelemetry-cpp::otlp_http_exporter + # opentelemetry-cpp::otlp_http_log_record_exporter - Imported target of opentelemetry-cpp::otlp_http_log_record_exporter + # opentelemetry-cpp::otlp_http_metric_exporter - Imported target of opentelemetry-cpp::otlp_http_metric_exporter + opentelemetry-cpp::ostream_log_record_exporter # - Imported target of opentelemetry-cpp::ostream_log_record_exporter + # opentelemetry-cpp::ostream_metrics_exporter - Imported target of opentelemetry-cpp::ostream_metrics_exporter + # opentelemetry-cpp::ostream_span_exporter - Imported target of opentelemetry-cpp::ostream_span_exporter + # opentelemetry-cpp::elasticsearch_log_record_exporter - Imported target of opentelemetry-cpp::elasticsearch_log_record_exporter + # opentelemetry-cpp::etw_exporter - Imported target of opentelemetry-cpp::etw_exporter + # opentelemetry-cpp::zpages - Imported target of opentelemetry-cpp::zpages + # opentelemetry-cpp::http_client_curl - Imported target of opentelemetry-cpp::http_client_curl + # opentelemetry-cpp::opentracing_shim - Imported target of opentelemetry-cpp::opentracing_shim + + ) + IF (USE_OPENSSL) target_link_libraries ( jlib ${OPENSSL_LIBRARIES}) endif (USE_OPENSSL) diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index beafd59809c..8771d121e4c 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -9067,6 +9067,8 @@ jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, con configFileUpdater->init(std::get<0>(result).c_str(), componentDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); if (monitor) configFileUpdater->startMonitoring(); + + initTraceManager(componentTag, componentConfiguration.get()); return componentConfiguration.getLink(); } diff --git a/system/jlib/jtrace.cpp b/system/jlib/jtrace.cpp index 8a486b052e9..301e52c242e 100644 --- a/system/jlib/jtrace.cpp +++ b/system/jlib/jtrace.cpp @@ -16,12 +16,38 @@ ############################################################################## */ -#include "platform.h" +#include "opentelemetry/trace/semantic_conventions.h" //known span defines +#include "opentelemetry/context/propagation/global_propagator.h" // context::propagation::GlobalTextMapPropagator::GetGlobalPropagator +#include "opentelemetry/sdk/trace/tracer_provider_factory.h" //opentelemetry::sdk::trace::TracerProviderFactory::Create(context) +#include "opentelemetry/sdk/trace/tracer_context_factory.h" //opentelemetry::sdk::trace::TracerContextFactory::Create(std::move(processors)); +#include "opentelemetry/sdk/trace/simple_processor_factory.h" +#include "opentelemetry/sdk/trace/batch_span_processor_factory.h" +#include "opentelemetry/exporters/ostream/span_exporter_factory.h"// auto exporter = opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create(); +//#define oldForEach ForEach // error: ‘ForEach’ was not declared in this scope +#undef ForEach //opentelemetry defines ForEach +#include "opentelemetry/exporters/memory/in_memory_span_exporter_factory.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" //opentel_trace::propagation::kTraceParent +#undef UNIMPLEMENTED //opentelemetry defines UNIMPLEMENTED +#include "opentelemetry/trace/provider.h" //StartSpanOptions +#include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h" +#define UNIMPLEMENTED throw makeStringExceptionV(-1, "UNIMPLEMENTED feature at %s(%d)", sanitizeSourceFile(__FILE__), __LINE__) +#define ForEach(i) for((i).first();(i).isValid();(i).next()) + +#include "opentelemetry/exporters/otlp/otlp_grpc_exporter_factory.h" +#include "opentelemetry/exporters/otlp/otlp_http_exporter_options.h" +#include "opentelemetry/exporters/memory/in_memory_span_data.h" + +#include "platform.h" +#include "jlib.hpp" #include "jmisc.hpp" #include "jtrace.hpp" #include "lnuid.h" +namespace context = opentelemetry::context; +namespace nostd = opentelemetry::nostd; +namespace opentel_trace = opentelemetry::trace; + using namespace ln_uid; /* @@ -44,6 +70,11 @@ LogTrace::LogTrace(const char * globalId) setGlobalId(globalId); } +LogTrace::LogTrace() +{ + assignLocalId(); +} + const char* LogTrace::assignLocalId() { localId.set(createUniqueIdString().c_str()); @@ -69,3 +100,617 @@ const char* LogTrace::queryLocalId() const { return localId.get(); } + +class CHPCCHttpTextMapCarrier : public opentelemetry::context::propagation::TextMapCarrier +{ +public: + CHPCCHttpTextMapCarrier(const IProperties * httpHeaders) + { + if (httpHeaders) + { + this->httpHeaders.setown(createProperties()); + Owned iter = httpHeaders->getIterator(); + ForEach(*iter) + { + const char * key = iter->getPropKey(); + const char * val = httpHeaders->queryProp(key); + this->httpHeaders->setProp(key, val); + } + } + } + + CHPCCHttpTextMapCarrier() + { + httpHeaders.setown(createProperties()); + }; + + virtual opentelemetry::nostd::string_view Get(opentelemetry::nostd::string_view key) const noexcept override + { + std::string theKey = key.data(); + + if (theKey.empty()) + return ""; + + Owned iter = httpHeaders->getIterator(); + ForEach(*iter) + { + const char * propKey = iter->getPropKey(); + if (stricmp(propKey, theKey.c_str()) == 0) + return httpHeaders->queryProp(propKey); + } + + return ""; + } + + virtual void Set(opentelemetry::nostd::string_view key, opentelemetry::nostd::string_view value) noexcept override + { + httpHeaders->setProp(std::string(key).c_str(), std::string(value).c_str()); + } + +private: + Owned httpHeaders; +}; + +enum SpanType { ServerSpan, ClientSpan, InternalSpan }; + +class CSpan : public CInterfaceOf +{ +public: + CSpan() : span(nullptr) {}; + ~CSpan() + { + if (span != nullptr) + { + StringBuffer out; + toString(out); + DBGLOG("Span end: (%s)", out.str()); + span->End(); + } + } + + ISpan * createClientSpan(const char * name) override + { + return new CSpan(name, SpanType::ClientSpan, this); + } + + ISpan * createInternalSpan(const char * name) override + { + return new CSpan(name, SpanType::InternalSpan, this); + } + + void toString(StringBuffer & out) override + { + if (span != nullptr) + { + out.append("Name: ").append(name.get()) + .append(" SpanID: ").append(spanID.get()) + .append(" TraceID: ").append(traceID.get()) + .append(" TraceFlags: ").append(traceFlags.get()) + .append(" HPCCGlobalID: ").append(hpccGlobalId.get()) + .append(" HPCCCallerID: ").append(hpccCallerId.get()); + + if (parentSpan != nullptr) + { + out.append(" ParentSpanID: "); + parentSpan->toString(out); + } + } + }; + + void setSpanAttributes(const IProperties * attributes) override + { + Owned iter = attributes->getIterator(); + ForEach(*iter) + { + const char * key = iter->getPropKey(); + if (!isEmptyString(key)) + setSpanAttribute(key, attributes->queryProp(key)); + } + } + + void setSpanAttribute(const char * key, const char * val) override + { + if (span && !isEmptyString(key) && !isEmptyString(val)) + span->SetAttribute(key, val); + } + + void addSpanEvent(const char * eventName) override + { + if (span && !isEmptyString(eventName)) + span->AddEvent(eventName); + } + + void querySpanContextProperties(IProperties * contextProps) override + { + if (span != nullptr && contextProps != nullptr) + { + contextProps->setProp("traceID", traceID.get()); + contextProps->setProp("spanID", spanID.get()); + contextProps->setProp("traceFlags", traceFlags.get()); + contextProps->setProp("hpccGlobalId", hpccGlobalId.get()); + contextProps->setProp("hpccCallerId", hpccCallerId.get()); + } + } + + + /** + * Injects the current span context into the given HTTP text map carrier. + * The carrier is used to propagate the span context across process boundaries. + * + * @param carrier A pointer to the HTTP text map carrier to inject the span context into. + * @return True if the span context was successfully injected, false otherwise. + */ + /*bool injectSpanContext(CHPCCHttpTextMapCarrier * carrier) override + { + if (!carrier) + return false; + + auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + + //get current context... + opentelemetry::v1::context::Context currentCtx = opentelemetry::context::RuntimeContext::GetCurrent(); + //and have the propagator inject the ctx into carrier + propagator->Inject(*carrier, currentCtx); + + if (!isEmptyString(hpccGlobalId.get())) + carrier->Set(HPCCSemanticConventions::kGLOBALIDHTTPHeader, hpccGlobalId.get()); + + if (!isEmptyString(hpccCallerId.get())) + carrier->Set(HPCCSemanticConventions::kCallerIdHTTPHeader, hpccCallerId.get()); + + return true; + }*/ + + bool injectSpanContext(IProperties * contextProps) override + { + if (!contextProps || isEmptyString(traceID.get()) || isEmptyString(spanID.get()) || isEmptyString(traceFlags.get())) + return false; + + StringBuffer contextHTTPHeader; + //The traceparent header uses the version-trace_id-parent_id-trace_flags format where: + //version is always 00. trace_id is a hex-encoded trace id. span_id is a hex-encoded span id. trace_flags is a hex-encoded 8-bit field that contains tracing flags such as sampling, trace level, etc. + //Example: "traceparent", "00-beca49ca8f3138a2842e5cf21402bfff-4b960b3e4647da3f-01" + contextHTTPHeader.append("00-").append(traceID.get()).append("-").append(spanID.get()).append(traceFlags.get()); + contextProps->setProp(opentelemetry::trace::propagation::kTraceParent.data(), contextHTTPHeader.str()); + + StringBuffer traceStateHTTPHeader; + traceStateHTTPHeader.append("hpcc=").append(spanID.get()); + + contextProps->setProp(opentelemetry::trace::propagation::kTraceState.data(), contextHTTPHeader.str()); + + if (!isEmptyString(hpccGlobalId.get())) + contextProps->setProp(HPCCSemanticConventions::kGLOBALIDHTTPHeader, hpccGlobalId.get()); + + if (!isEmptyString(hpccCallerId.get())) + contextProps->setProp(HPCCSemanticConventions::kCallerIdHTTPHeader, hpccCallerId.get()); + + return true; + } + + opentelemetry::v1::trace::SpanContext querySpanContext() const + { + if (span != nullptr) + return span->GetContext(); + + return opentelemetry::trace::SpanContext::GetInvalid(); + } + + const char * queryTraceName() const + { + return tracerName.get(); + } + +protected: + CSpan(const char * spanName, SpanType type, CSpan * parent) + { + name.set(spanName); + parentSpan = parent; + if (parentSpan != nullptr) + tracerName.set(parent->queryTraceName()); + this->type = type; + + init(); + } + + CSpan(const char * spanName, SpanType type, const char * tracerName_) + { + name.set(spanName); + parentSpan = nullptr; + tracerName.set(tracerName_); + this->type = type; + } + + void init() + { + switch (type) + { + case SpanType::ServerSpan: + opts.kind = opentelemetry::trace::SpanKind::kServer; + break; + case SpanType::ClientSpan: + opts.kind = opentelemetry::trace::SpanKind::kClient; + break; + case SpanType::InternalSpan: + default: + opts.kind = opentelemetry::trace::SpanKind::kInternal; + break; + } + + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + //what if tracerName is empty? + auto tracer = provider->GetTracer(tracerName.get()); + + if (parentSpan != nullptr) + injectParentSpan(parentSpan); + + span = tracer->StartSpan(name.get(), {}, opts); + + if (span != nullptr) + { + storeSpanContext(); + + StringBuffer out; + toString(out); + DBGLOG("Span start: (%s)", out.str()); + } + } + + void storeSpanContext() + { + if (span != nullptr) + { + storeTraceID(); + storeSpanID(); + storeTraceFlags(); + } + } + + void injectParentSpan(CSpan * parentSpan) + { + if (parentSpan != nullptr) + { + auto parentSpanCtx = parentSpan->querySpanContext(); + if(parentSpanCtx.IsValid()) + opts.parent = parentSpanCtx; + } + } + + void storeTraceID() + { + traceID.clear(); + + if (!span) + return; + + auto spanCtx = span->GetContext(); + if (!spanCtx.IsValid()) + return; + + if (!spanCtx.trace_id().IsValid()) + return; + + char trace_id[32] = {0}; + + spanCtx.trace_id().ToLowerBase16(trace_id); + traceID.set(trace_id, 32); + } + + void storeSpanID() + { + spanID.clear(); + + if (!span) + return; + + char span_id[16] = {0}; + span->GetContext().span_id().ToLowerBase16(span_id); + + spanID.set(span_id, 16); + } + + void storeTraceFlags() + { + traceFlags.clear(); + + if (!span) + return; + + char trace_flags[2] = {0}; + span->GetContext().trace_flags().ToLowerBase16(trace_flags); + + traceFlags.set(trace_flags, 2); + } + + StringAttr name; + StringAttr tracerName; + StringAttr traceFlags; + StringAttr traceID; + StringAttr spanID; + StringAttr hpccGlobalId; + StringAttr hpccCallerId; + + opentelemetry::trace::StartSpanOptions opts; + nostd::shared_ptr span; + CSpan * parentSpan = nullptr; + SpanType type = SpanType::InternalSpan; +}; + +class CServerSpan : public CSpan +{ +private: + void setSpanContext(StringArray & httpHeaders, const char kvDelineator = ':') + { + Owned contextProps = createProperties(); + ForEachItemIn(currentHeaderIndex, httpHeaders) + { + const char* httpHeader = httpHeaders.item(currentHeaderIndex); + if(!httpHeader) + continue; + + const char* delineator = strchr(httpHeader, kvDelineator); + if(delineator == nullptr) + continue; + + StringBuffer key, value; + key.append(delineator - httpHeader, httpHeader); + value.set(delineator + 1); + + contextProps->setProp(key, value); + } + + setSpanContext(contextProps); + } + + void setSpanContext(const IProperties * httpHeaders) + { + if (httpHeaders) + { + // perform any key mapping needed... + //Instrumented http client/server Capitalizes the first letter of the header name + //if (key == opentel_trace::propagation::kTraceParent || key == opentel_trace::propagation::kTraceState ) + // theKey[0] = toupper(theKey[0]); + + hpccGlobalId.set(httpHeaders->queryProp(HPCCSemanticConventions::kGLOBALIDHTTPHeader)); + hpccCallerId.set(httpHeaders->queryProp(HPCCSemanticConventions::kCallerIdHTTPHeader)); + + const CHPCCHttpTextMapCarrier carrier(httpHeaders); + auto globalPropegator = context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + opentelemetry::v1::context::Context currentContext = context::RuntimeContext::GetCurrent(); + //auto inValidCtx = opentelemetry::trace::SpanContext::GetInvalid(); + //auto defSpan = new opentelemetry::v1::trace::Span(); + //auto defSpan = nostd::shared_ptr(new DefaultSpan(inValidCtx)); + //auto defspanctx = new opentelemetry::trace::DefaultSpan(inValidCtx); + //auto defSpan = nostd::shared_ptr(defspanctx); + //opentelemetry::trace::propagation::ExtractContextFromTraceHeaders(); + + //auto c = opentelemetry::trace::propagation::ExtractImpl(carrier); + auto newContext = globalPropegator->Extract(carrier, currentContext); + //auto newContext = globalPropegator->Extract(carrier, inValidCtx); + opentelemetry::v1::nostd::shared_ptr declaredParentSpan = opentelemetry::trace::GetSpan(newContext); + if (declaredParentSpan != nullptr) + opts.parent = declaredParentSpan->GetContext(); + } + + //if (!httpHeaders || !parentContext.IsValid()) + //{ + //generate new context? + //} + } + +public: + CServerSpan(const char * spanName, const char * tracerName_, StringArray & httpHeaders) + : CSpan(spanName, SpanType::ServerSpan, tracerName_) + { + setSpanContext(httpHeaders); + init(); + } + + CServerSpan(const char * spanName, const char * tracerName_, const IProperties * httpHeaders) + : CSpan(spanName, SpanType::ServerSpan, tracerName_) + { + setSpanContext(httpHeaders); + init(); + } +}; + +class CTraceManager : implements ITraceManager, public CInterface +//class CTraceManager : CInterfaceOf +{ +private: + bool enabled = true; + StringAttr moduleName; + + void initTracer(IPropertyTree * traceConfig) + { + try + { +//#ifdef TRACECONFIGDEBUG + if (!traceConfig || !traceConfig->hasProp("tracing")) + { + const char * simulatedGlobalYaml = R"!!(global: + tracing: + enable: true + exporter: + type: OTLP + endpoint: "localhost:4317" + useSslCredentials: true + sslCredentialsCACcert: "ssl-certificate" + processor: + batchSpan: true + simpleSpan: false + )!!"; + Owned testTree = createPTreeFromYAMLString(simulatedGlobalYaml, ipt_none, ptr_ignoreWhiteSpace, nullptr); + traceConfig = testTree->getPropTree("global/tracing"); + } + + StringBuffer xml; + toXML(traceConfig, xml); + DBGLOG("traceConfig tree: %s", xml.str()); +//#endif + + if (traceConfig && traceConfig->getPropBool("@enable", false)) + { + DBGLOG("OpenTel tracing enabled"); + using namespace opentelemetry::trace; + //std::unique_ptr exporter = opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create(); + //std::unique_ptr exporter = opentelemetry::sdk::trace::export::NoopSpanExporter; + + //Currently using InMemorySpanExporter as default, until a noop exporter is available + std::shared_ptr data; + std::unique_ptr exporter = opentelemetry::exporter::memory::InMemorySpanExporterFactory::Create(data); + + Owned exportConfig = traceConfig->getPropTree("exporter"); +#ifdef TRACECONFIGDEBUG + StringBuffer xml; + toXML(exportConfig, xml); + DBGLOG("exportConfig tree: %s", xml.str()); +#endif + if (exportConfig) + { + StringBuffer exportType; + exportConfig->getProp("@type", exportType); + DBGLOG("Exporter type: %s", exportType.str()); + + if (!exportType.isEmpty()) + { + if (strcasecmp(exportType.str(), "OS")==0) //To stdout/err + { + exporter = opentelemetry::exporter::trace::OStreamSpanExporterFactory::Create(); + DBGLOG("Tracing to stdout/err..."); + } + else if (strcasecmp(exportType.str(), "OTLP")==0) + { + namespace otlp = opentelemetry::exporter::otlp; + + otlp::OtlpGrpcExporterOptions opts; + StringBuffer endPoint; + exportConfig->getProp("@endpoint", endPoint); + opts.endpoint = endPoint.str(); + + opts.use_ssl_credentials = exportConfig->getPropBool("@useSslCredentials", false); + + if (opts.use_ssl_credentials) + { + StringBuffer sslCACert; + exportConfig->getProp("@sslCredentialsCACcert", sslCACert); + opts.ssl_credentials_cacert_as_string = sslCACert.str(); + } + + exporter = otlp::OtlpGrpcExporterFactory::Create(opts); + DBGLOG("Tracing to OTLP (%s)", endPoint.str()); + } + else if (strcasecmp(exportType.str(), "Prometheus")==0) + DBGLOG("Tracing to Prometheus currently not supported"); + else if (strcasecmp(exportType.str(), "HPCC")==0) + DBGLOG("Tracing to HPCC JLog currently not supported"); + } + else + DBGLOG("Tracing exporter type not specified"); + } + + Owned processorConfig = traceConfig->getPropTree("processor"); + std::unique_ptr processor; + if (exportConfig && exportConfig->getPropBool("@batchSpan", false)) + { + //Groups several spans together, before sending them to an exporter. + opentelemetry::v1::sdk::trace::BatchSpanProcessorOptions options; //size_t max_queue_size = 2048; + //The time interval between two consecutive exports + //std::chrono::milliseconds(5000); + //The maximum batch size of every export. It must be smaller or + //equal to max_queue_size. + //size_t max_export_batch_size = 512 + processor = opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(std::move(exporter), options); + DBGLOG("OpenTel tracing using batch Span Processor"); + } + else + { + //SimpleSpanProcesser sends spans one by one to an exporter. + processor = opentelemetry::sdk::trace::SimpleSpanProcessorFactory::Create(std::move(exporter)); + DBGLOG("OpenTel tracing using Simple Span Processor"); + } + + std::vector> processors; + processors.push_back(std::move(processor)); + + // Default is an always-on sampler. + std::shared_ptr context = + opentelemetry::sdk::trace::TracerContextFactory::Create(std::move(processors)); + std::shared_ptr provider = + opentelemetry::sdk::trace::TracerProviderFactory::Create(context); + + // Set the global trace provider + opentelemetry::trace::Provider::SetTracerProvider(provider); + + // set global propagator + // Injects Context into and extracts it from carriers that travel in-band + // across process boundaries. Encoding is expected to conform to the HTTP + // Header Field semantics. + // Values are often encoded as RPC/HTTP request headers. + opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator( + opentelemetry::nostd::shared_ptr( + new opentelemetry::trace::propagation::HttpTraceContext())); + } + } + catch (IException * e) + { + EXCLOG(e); + e->Release(); + } + } + + void cleanupTracer() + { + std::shared_ptr none; + opentelemetry::trace::Provider::SetTracerProvider(none); + } + + nostd::shared_ptr tracer; + +public: + IMPLEMENT_IINTERFACE; + CTraceManager(const char * componentName, IPropertyTree * traceConfig) + { + moduleName.set(componentName); + initTracer(traceConfig); + + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + tracer = provider->GetTracer(moduleName.get()); + } + + CTraceManager() + { + throw makeStringExceptionV(-1, "TraceManager must be intialized!"); + } + + ISpan * createServerSpan(const char * name, StringArray & httpHeaders) override + { + return new CServerSpan(name, moduleName.get(), httpHeaders); + } + + ISpan * createServerSpan(const char * name, const IProperties * httpHeaders) override + { + return new CServerSpan(name, moduleName.get(), httpHeaders); + } +}; + +static Singleton theTraceManager; + +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + return true; +} + +MODULE_EXIT() +{ + theTraceManager.destroy(); +} + +void initTraceManager(const char * componentName, IPropertyTree * config) +{ + theTraceManager.query([=] () { return new CTraceManager(componentName, config); }); +} + +ITraceManager & queryTraceManager() +{ + return *theTraceManager.query([] () { return new CTraceManager; }); //throws if not initialized +} \ No newline at end of file diff --git a/system/jlib/jtrace.hpp b/system/jlib/jtrace.hpp index 6aa184bbdc0..4dad7cd639c 100644 --- a/system/jlib/jtrace.hpp +++ b/system/jlib/jtrace.hpp @@ -18,6 +18,17 @@ #ifndef JTRACE_HPP #define JTRACE_HPP +/** + * @brief This follows open telemetry's span attribute naming conventions + * Known HPCC span Keys could be added here + * Specialized span keys can also be defined within the scope of a span + */ +namespace HPCCSemanticConventions +{ +static constexpr const char *kGLOBALIDHTTPHeader = "HPCC-Global-Id"; +static constexpr const char *kCallerIdHTTPHeader = "HPCC-Caller-Id"; +} + class jlib_decl LogTrace { private: @@ -25,14 +36,14 @@ class jlib_decl LogTrace StringAttr callerId; StringAttr localId; - StringAttr globalIdHTTPHeaderName = "HPCC-Global-Id"; - StringAttr callerIdHTTPHeaderName = "HPCC-Caller-Id"; + StringAttr globalIdHTTPHeaderName = HPCCSemanticConventions::kGLOBALIDHTTPHeader; + StringAttr callerIdHTTPHeaderName = HPCCSemanticConventions::kCallerIdHTTPHeader; const char* assignLocalId(); public: - LogTrace() {}; + LogTrace(); LogTrace(const char * globalId); const char* queryGlobalId() const; @@ -55,6 +66,33 @@ class jlib_decl LogTrace void setLocalId(const char* id); }; +//class CHPCCHttpTextMapCarrier; + +interface ISpan : extends IInterface +{ + virtual void setSpanAttribute(const char * key, const char * val) = 0; + virtual void setSpanAttributes(const IProperties * attributes) = 0; + virtual void addSpanEvent(const char * eventName) = 0; + + virtual void querySpanContextProperties(IProperties * contextProps) = 0; + //virtual bool injectSpanContext(CHPCCHttpTextMapCarrier * carrier) = 0; + virtual bool injectSpanContext(IProperties * contextProps) = 0; + + virtual void toString(StringBuffer & out) = 0; + + virtual ISpan * createClientSpan(const char * name) = 0; + virtual ISpan * createInternalSpan(const char * name) = 0; +}; + +interface ITraceManager : extends IInterface +{ + virtual ISpan * createServerSpan(const char * name, StringArray & httpHeaders) = 0; + virtual ISpan * createServerSpan(const char * name, const IProperties * httpHeaders) = 0; + }; + +extern jlib_decl void initTraceManager(const char * componentName, IPropertyTree * traceConfig); +extern jlib_decl ITraceManager & queryTraceManager(); + /* To use feature-level tracing flags, protect the tracing with a test such as: diff --git a/vcpkg.json.in b/vcpkg.json.in index fa695654bae..334ae5aab9d 100644 --- a/vcpkg.json.in +++ b/vcpkg.json.in @@ -177,6 +177,14 @@ { "name": "zlib", "platform": "@VCPKG_ZLIB@" + }, + { + "name": "opentelemetry-cpp", + "features": [ + "otlp", + "otlp-http", + "otlp-grpc" + ] } ] } \ No newline at end of file