Skip to content

Commit

Permalink
Pass CCDB Headers together with binary blob
Browse files Browse the repository at this point in the history
  • Loading branch information
shahor02 committed Oct 7, 2024
1 parent 9d39dbc commit 6fb7d5d
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 3 deletions.
15 changes: 15 additions & 0 deletions CCDB/include/CCDB/CcdbApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ class CcdbApi //: public DatabaseInterface
// Loads files from alien and cvmfs into given destination.
bool loadLocalContentToMemory(o2::pmr::vector<char>& dest, std::string& url) const;

// add annotated flattened headers in the end of the blob
static void appendFlatHeader(o2::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers);

// the failure to load the file to memory is signaled by 0 size and non-0 capacity
static bool isMemoryFileInvalid(const o2::pmr::vector<char>& v) { return v.size() == 0 && v.capacity() > 0; }
template <typename T>
Expand Down Expand Up @@ -610,6 +613,16 @@ class CcdbApi //: public DatabaseInterface
return getSnapshotDir(topdir, path) + '/' + sfile;
}

template <typename MAP> // can be either std::map or std::multimap
static size_t getFlatHeaderSize(const MAP& Headers)
{
size_t hsize = sizeof(int) + sizeof(FlatHeaderAnnot); // annotation size
for (auto& h : Headers) {
hsize += h.first.length() + h.second.length() + 2; // 2*(string_buffer + terminating null character)
}
return hsize;
}

// tmp helper and single point of entry for a CURL perform call
// helps to switch between easy handle perform and multi handles in a single place
CURLcode CURL_perform(CURL* handle) const;
Expand All @@ -632,6 +645,8 @@ class CcdbApi //: public DatabaseInterface
size_t mCurlTimeoutDownload = 15; // download timeout in seconds, can be configured via ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD, updated according to the deployment mode
size_t mCurlTimeoutUpload = 15; // upload timeout in seconds, can be configured via ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD, updated according to the deployment mode

static constexpr char FlatHeaderAnnot[] = "$HEADER$"; // annotation for flat header

ClassDefNV(CcdbApi, 1);
};

Expand Down
26 changes: 24 additions & 2 deletions CCDB/src/CcdbApi.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1687,12 +1687,15 @@ void CcdbApi::scheduleDownload(RequestContext& requestContext, size_t* requestCo
ho.counter++;
try {
if (chunk.capacity() < chunk.size() + realsize) {
// estimate headers size when converted to annotated text string
const char hannot[] = "header";
size_t hsize = getFlatHeaderSize(ho.header);
auto cl = ho.header.find("Content-Length");
if (cl != ho.header.end()) {
size_t sizeFromHeader = std::stol(cl->second);
sz = std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
} else {
sz = std::max(chunk.size() * 2, chunk.size() + realsize);
sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
// LOGP(debug, "SIZE IS NOT IN HEADER, allocate {}", sz);
}
chunk.reserve(sz);
Expand Down Expand Up @@ -1885,6 +1888,25 @@ void CcdbApi::loadFileToMemory(o2::pmr::vector<char>& dest, std::string const& p
vectoredLoadFileToMemory(contexts);
}

void CcdbApi::appendFlatHeader(o2::pmr::vector<char>& dest, const std::map<std::string, std::string>& headers)
{
size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
dest.resize(cnt + hsize);
auto addString = [&dest, &cnt](const std::string& s) {
for (char c : s) {
dest[cnt++] = c;
}
dest[cnt++] = 0;
};

for (auto& h : headers) {
addString(h.first);
addString(h.second);
}
*reinterpret_cast<int*>(&dest[cnt]) = hsize; // store size
std::memcpy(&dest[cnt + sizeof(int)], FlatHeaderAnnot, sizeof(FlatHeaderAnnot)); // annotate the flattened headers map
}

void CcdbApi::navigateSourcesAndLoadFile(RequestContext& requestContext, int& fromSnapshot, size_t* requestCounter) const
{
LOGP(debug, "loadFileToMemory {} ETag=[{}]", requestContext.path, requestContext.etag);
Expand Down
2 changes: 2 additions & 0 deletions Framework/CCDBSupport/src/CCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
Expand All @@ -273,6 +274,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/DataRefUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ struct DataRefUtils {
}
// Decode a CCDB object using the CcdbApi.
static void* decodeCCDB(DataRef const& ref, std::type_info const& info);
static std::map<std::string, std::string> extractCCDBHeaders(DataRef const& ref);

static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef& ref)
{
Expand Down
49 changes: 48 additions & 1 deletion Framework/Core/src/DataRefUtils.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// or submit itself to any jurisdiction.

#include <typeinfo>
#include <cstring>
#include "Framework/DataRefUtils.h"
#include "Framework/RuntimeError.h"
#include "Framework/Logger.h"
Expand Down Expand Up @@ -80,7 +81,21 @@ void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo)
Int_t previousErrorLevel = gErrorIgnoreLevel;
gErrorIgnoreLevel = kFatal;
auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
TMemFile memFile("name", const_cast<char*>(ref.payload), dh->payloadSize, "READ");
const char* buff = const_cast<char*>(ref.payload);
size_t flSize = dh->payloadSize;
// does it have a flattened headers map attached in the end?
constexpr char FlatHeaderAnnot[] = "$HEADER$";
constexpr int Offset = sizeof(int) + sizeof(FlatHeaderAnnot);
int headerSize = 0;
if (dh->payloadSize >= Offset &&
!std::strncmp(buff + flSize - sizeof(FlatHeaderAnnot), FlatHeaderAnnot, sizeof(FlatHeaderAnnot))) {
headerSize = *reinterpret_cast<const int*>(buff + flSize - Offset);
}
if (headerSize <= 0) {
LOGP(fatal, "Anomalous flatthened header size {} extracted", headerSize);
}

TMemFile memFile("name", const_cast<char*>(ref.payload), flSize, "READ");
gErrorIgnoreLevel = previousErrorLevel;
if (memFile.IsZombie()) {
return nullptr;
Expand All @@ -94,4 +109,36 @@ void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo)
return result;
}

std::map<std::string, std::string> DataRefUtils::extractCCDBHeaders(DataRef const& ref)
{
auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
const char* buff = const_cast<char*>(ref.payload);
size_t flSize = dh->payloadSize;
// does it have a flattened headers map attached in the end?
constexpr char FlatHeaderAnnot[] = "$HEADER$";
constexpr int Offset = sizeof(int) + sizeof(FlatHeaderAnnot);
int headerSize = 0, ss0 = 0;
if (dh->payloadSize >= Offset && !std::strncmp(buff + flSize - sizeof(FlatHeaderAnnot), FlatHeaderAnnot, sizeof(FlatHeaderAnnot))) {
headerSize = *reinterpret_cast<const int*>(buff + flSize - Offset);
}
if (headerSize <= 0) {
LOGP(fatal, "Anomalous flatthened header size {} extracted", headerSize);
}
buff += flSize - headerSize; // jump to the start of flattened header
headerSize -= Offset;
const char* str0 = &buff[ss0++];
std::map<std::string, std::string> res;
while (ss0 < headerSize) {
if (buff[ss0++] == 0) {
if (!str0) {
str0 = &buff[ss0]; // new key string is found
} else {
res.emplace(std::string(str0), std::string(&buff[ss0])); // new value string found, add key value to the map
str0 = nullptr;
}
}
}
return res;
}

} // namespace o2::framework

0 comments on commit 6fb7d5d

Please sign in to comment.