diff --git a/CCDB/include/CCDB/CcdbApi.h b/CCDB/include/CCDB/CcdbApi.h index ea396a5b49402..9ba8869fb7de3 100644 --- a/CCDB/include/CCDB/CcdbApi.h +++ b/CCDB/include/CCDB/CcdbApi.h @@ -392,6 +392,9 @@ class CcdbApi //: public DatabaseInterface // Loads files from alien and cvmfs into given destination. bool loadLocalContentToMemory(o2::pmr::vector& dest, std::string& url) const; + // add annotated flattened headers in the end of the blob + static void appendFlatHeader(o2::pmr::vector& dest, const std::map& headers); + // the failure to load the file to memory is signaled by 0 size and non-0 capacity static bool isMemoryFileInvalid(const o2::pmr::vector& v) { return v.size() == 0 && v.capacity() > 0; } template @@ -610,6 +613,16 @@ class CcdbApi //: public DatabaseInterface return getSnapshotDir(topdir, path) + '/' + sfile; } + template // can be either std::map or std::multimap + static size_t getFlatHeaderSize(const MAP& Headers) + { + size_t hsize = sizeof(int) + sizeof(FlatHeaderAnnot); // annotation size + for (auto& h : Headers) { + hsize += h.first.length() + h.second.length() + 2; // 2*(string_buffer + terminating null character) + } + return hsize; + } + // tmp helper and single point of entry for a CURL perform call // helps to switch between easy handle perform and multi handles in a single place CURLcode CURL_perform(CURL* handle) const; @@ -632,6 +645,8 @@ class CcdbApi //: public DatabaseInterface size_t mCurlTimeoutDownload = 15; // download timeout in seconds, can be configured via ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD, updated according to the deployment mode size_t mCurlTimeoutUpload = 15; // upload timeout in seconds, can be configured via ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD, updated according to the deployment mode + static constexpr char FlatHeaderAnnot[] = "$HEADER$"; // annotation for flat header + ClassDefNV(CcdbApi, 1); }; diff --git a/CCDB/src/CcdbApi.cxx b/CCDB/src/CcdbApi.cxx index a17458a33e5e6..3b622b87e7e7b 100644 --- a/CCDB/src/CcdbApi.cxx +++ b/CCDB/src/CcdbApi.cxx @@ -1687,12 +1687,15 @@ void CcdbApi::scheduleDownload(RequestContext& requestContext, size_t* requestCo ho.counter++; try { if (chunk.capacity() < chunk.size() + realsize) { + // estimate headers size when converted to annotated text string + const char hannot[] = "header"; + size_t hsize = getFlatHeaderSize(ho.header); auto cl = ho.header.find("Content-Length"); if (cl != ho.header.end()) { size_t sizeFromHeader = std::stol(cl->second); - sz = std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader); + sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader); } else { - sz = std::max(chunk.size() * 2, chunk.size() + realsize); + sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize); // LOGP(debug, "SIZE IS NOT IN HEADER, allocate {}", sz); } chunk.reserve(sz); @@ -1885,6 +1888,25 @@ void CcdbApi::loadFileToMemory(o2::pmr::vector& dest, std::string const& p vectoredLoadFileToMemory(contexts); } +void CcdbApi::appendFlatHeader(o2::pmr::vector& dest, const std::map& headers) +{ + size_t hsize = getFlatHeaderSize(headers), cnt = dest.size(); + dest.resize(cnt + hsize); + auto addString = [&dest, &cnt](const std::string& s) { + for (char c : s) { + dest[cnt++] = c; + } + dest[cnt++] = 0; + }; + + for (auto& h : headers) { + addString(h.first); + addString(h.second); + } + *reinterpret_cast(&dest[cnt]) = hsize; // store size + std::memcpy(&dest[cnt + sizeof(int)], FlatHeaderAnnot, sizeof(FlatHeaderAnnot)); // annotate the flattened headers map +} + void CcdbApi::navigateSourcesAndLoadFile(RequestContext& requestContext, int& fromSnapshot, size_t* requestCounter) const { LOGP(debug, "loadFileToMemory {} ETag=[{}]", requestContext.path, requestContext.etag); diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index 6c510ad189ac5..ab1a21e263eb0 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -260,6 +260,7 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->mapURL2UUID[path].cacheMiss++; helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); @@ -273,6 +274,7 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->mapURL2UUID[path].cacheMiss++; helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); + api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); @@ -395,6 +397,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); newOrbitResetTime = getOrbitResetTime(v); + api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); @@ -405,6 +408,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize); helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize); newOrbitResetTime = getOrbitResetTime(v); + api.appendFlatHeader(v, headers); auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); diff --git a/Framework/Core/include/Framework/DataRefUtils.h b/Framework/Core/include/Framework/DataRefUtils.h index 264533def326d..c63b06357b8ed 100644 --- a/Framework/Core/include/Framework/DataRefUtils.h +++ b/Framework/Core/include/Framework/DataRefUtils.h @@ -179,6 +179,7 @@ struct DataRefUtils { } // Decode a CCDB object using the CcdbApi. static void* decodeCCDB(DataRef const& ref, std::type_info const& info); + static std::map extractCCDBHeaders(DataRef const& ref); static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef& ref) { diff --git a/Framework/Core/src/DataRefUtils.cxx b/Framework/Core/src/DataRefUtils.cxx index 239aba95aa3ce..b32c47387e69a 100644 --- a/Framework/Core/src/DataRefUtils.cxx +++ b/Framework/Core/src/DataRefUtils.cxx @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include +#include #include "Framework/DataRefUtils.h" #include "Framework/RuntimeError.h" #include "Framework/Logger.h" @@ -80,11 +81,29 @@ void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo) Int_t previousErrorLevel = gErrorIgnoreLevel; gErrorIgnoreLevel = kFatal; auto* dh = o2::header::get(ref.header); - TMemFile memFile("name", const_cast(ref.payload), dh->payloadSize, "READ"); + const char* buff = const_cast(ref.payload); + size_t flSize = dh->payloadSize; + // does it have a flattened headers map attached in the end? + constexpr char FlatHeaderAnnot[] = "$HEADER$"; + constexpr int Offset = sizeof(int) + sizeof(FlatHeaderAnnot); + int headerSize = 0; + LOGP(debug, "DHPayloadSize={}>{} Ref:{}/{} Cmp {}:{}", dh->payloadSize, Offset, dh->dataOrigin.as(), dh->dataDescription.as(), std::string{buff + dh->payloadSize - sizeof(FlatHeaderAnnot)}, std::string{FlatHeaderAnnot}); + + if (dh->payloadSize >= Offset && + !std::strncmp(buff + dh->payloadSize - sizeof(FlatHeaderAnnot), FlatHeaderAnnot, sizeof(FlatHeaderAnnot))) { + headerSize = *reinterpret_cast(buff + dh->payloadSize - Offset); + } + if (headerSize <= 0) { + LOGP(fatal, "Anomalous flattened header size {} extracted", headerSize); + } + TMemFile memFile("name", const_cast(ref.payload), dh->payloadSize - headerSize, "READ"); gErrorIgnoreLevel = previousErrorLevel; if (memFile.IsZombie()) { return nullptr; } + + extractCCDBHeaders(ref); + TClass* tcl = TClass::GetClass(tinfo); result = extractFromTFile(memFile, tcl, "ccdb_object"); if (!result) { @@ -94,4 +113,36 @@ void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo) return result; } +std::map DataRefUtils::extractCCDBHeaders(DataRef const& ref) +{ + auto* dh = o2::header::get(ref.header); + const char* buff = const_cast(ref.payload); + // does it have a flattened headers map attached in the end? + constexpr char FlatHeaderAnnot[] = "$HEADER$"; + constexpr int Offset = sizeof(int) + sizeof(FlatHeaderAnnot); + int headerSize = 0, ss0 = 0; + if (dh->payloadSize >= Offset && !std::strncmp(buff + dh->payloadSize - sizeof(FlatHeaderAnnot), FlatHeaderAnnot, sizeof(FlatHeaderAnnot))) { + headerSize = *reinterpret_cast(buff + dh->payloadSize - Offset); + } + if (headerSize <= 0) { + LOGP(fatal, "Anomalous flattened header size {} extracted", headerSize); + } + buff += dh->payloadSize - headerSize; // jump to the start of flattened header + headerSize -= Offset; + const char* str0 = &buff[ss0++]; + std::map res; + while (ss0 < headerSize) { + if (buff[ss0++] == 0) { + if (!str0) { + str0 = &buff[ss0]; // new key string is found + } else { + res.emplace(std::string(str0), std::string(&buff[ss0])); // new value string found, add key value to the map + LOGP(debug, "Header{} {}:{}", res.size(), std::string(str0), std::string(&buff[ss0])); + str0 = nullptr; + } + } + } + return res; +} + } // namespace o2::framework