diff --git a/common/thorhelper/thorfile.cpp b/common/thorhelper/thorfile.cpp index d27e6282dc1..1e31d971aa9 100644 --- a/common/thorhelper/thorfile.cpp +++ b/common/thorhelper/thorfile.cpp @@ -109,7 +109,7 @@ IHThorDiskReadArg * createWorkUnitReadArg(const char * filename, IHThorWorkunitR #define MAX_FILE_READ_FAIL_COUNT 3 -IKeyIndex *openKeyFile(IDistributedFilePart & keyFile) +IKeyIndex *openKeyFile(IDistributedFilePart & keyFile, size32_t blockedIndexIOSize) { unsigned failcount = 0; unsigned numCopies = keyFile.numCopies(); @@ -128,7 +128,13 @@ IKeyIndex *openKeyFile(IDistributedFilePart & keyFile) rfn.getPath(remotePath); unsigned crc = 0; keyFile.getCrc(crc); - return createKeyIndex(remotePath.str(), crc, false); + Owned iFile = createIFile(remotePath.str()); + Owned iFileIO = iFile->open(IFOread); + if (nullptr == iFileIO) + throw makeStringExceptionV(0, "Failed to open index file %s", remotePath.str()); + if (blockedIndexIOSize) + iFileIO.setown(createBlockedIO(iFileIO.getClear(), blockedIndexIOSize)); + return createKeyIndex(remotePath.str(), crc, *iFileIO, (unsigned) -1, false); } } catch (IException *E) diff --git a/common/thorhelper/thorfile.hpp b/common/thorhelper/thorfile.hpp index 28fb5be9827..4f74a4b510d 100644 --- a/common/thorhelper/thorfile.hpp +++ b/common/thorhelper/thorfile.hpp @@ -55,7 +55,7 @@ interface IKeyIndex; THORHELPER_API bool checkIndexMetaInformation(IDistributedFile * file, bool force); THORHELPER_API bool calculateDerivedIndexInformation(DerivedIndexInformation & result, IDistributedFile * file, bool force); THORHELPER_API void mergeDerivedInformation(DerivedIndexInformation & result, const DerivedIndexInformation & other); -THORHELPER_API IKeyIndex *openKeyFile(IDistributedFilePart & keyFile); +THORHELPER_API IKeyIndex *openKeyFile(IDistributedFilePart & keyFile, size32_t blockedIndexFileIOSize=0); #endif diff --git a/dali/base/dautils.cpp b/dali/base/dautils.cpp index e92876252b7..b5b4ca134c9 100644 --- a/dali/base/dautils.cpp +++ b/dali/base/dautils.cpp @@ -103,23 +103,17 @@ void getPlaneHosts(StringArray &hosts, IPropertyTree *plane) } } -constexpr const char * lz_plane_path = "storage/planes[@category='lz']"; - IPropertyTreeIterator * getDropZonePlanesIterator(const char * name) { - StringBuffer xpath(lz_plane_path); - if (!isEmptyString(name)) - xpath.appendf("[@name='%s']", name); - return getGlobalConfigSP()->getElements(xpath); + return getPlanesIterator("lz", name); } IPropertyTree * getDropZonePlane(const char * name) { if (isEmptyString(name)) throw makeStringException(-1, "Drop zone name required"); - StringBuffer xpath(lz_plane_path); - xpath.appendf("[@name='%s']", name); - return getGlobalConfigSP()->getPropTree(xpath); + Owned iter = getDropZonePlanesIterator(name); + return iter->first() ? &iter->get() : nullptr; } IPropertyTree * findPlane(const char *category, const char * path, const char * host, bool ipMatch, bool mustMatch) diff --git a/ecl/hthor/hthorkey.cpp b/ecl/hthor/hthorkey.cpp index 82e1ede5d56..9f5b46155ad 100644 --- a/ecl/hthor/hthorkey.cpp +++ b/ecl/hthor/hthorkey.cpp @@ -594,7 +594,15 @@ void CHThorIndexReadActivityBase::killPart() bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart) { - keyIndex.setown(openKeyFile(df->queryPart(whichPart))); + IDistributedFilePart &part = df->queryPart(whichPart); + size32_t blockedSize = 0; + if (!helper.hasSegmentMonitors()) // unfiltered + { + StringBuffer planeName; + df->getClusterName(part.copyClusterNum(0), planeName); + blockedSize = getBlockedFileIOSize(planeName); + } + keyIndex.setown(openKeyFile(part, blockedSize)); if(df->numParts() == 1) verifyIndex(keyIndex); initPart(); diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index 9c2cd70f594..ec1cf9bdc0f 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -544,6 +544,11 @@ "waitForMount": { "type": "boolean" }, + "blockedFileIOKB": { + "description": "Optimal block size for efficient reading from this plane. Implementations will use if they can", + "type": "integer", + "default": 0 + }, "components": {}, "prefix": {}, "subPath": {}, diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index 8c7df09b83f..ceac0fb3391 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -7568,6 +7568,16 @@ IPropertyTree * getRemoteStorage(const char * name) return global->getPropTree(xpath); } +IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name) +{ + StringBuffer xpath("storage/planes"); + if (!isEmptyString(category)) + xpath.appendf("[@category='%s']", category); + if (!isEmptyString(name)) + xpath.appendf("[@name='%s']", name); + return getGlobalConfigSP()->getElements(xpath); +} + IAPICopyClient * createApiCopyClient(IStorageApiInfo * source, IStorageApiInfo * target) { ReadLockBlock block(containedFileHookLock); @@ -7579,3 +7589,101 @@ IAPICopyClient * createApiCopyClient(IStorageApiInfo * source, IStorageApiInfo * } return nullptr; } + + +// NB: This implementation is not thread-safe. +// Therefore it should only be used by use cases that are single threaded +class CBlockedFileIO : public CSimpleInterfaceOf +{ + Owned io; + size32_t blockSize = 0; + size32_t readLen = 0; + void *buffer = nullptr; + offset_t lastReadPos = (offset_t)-1; + MemoryBuffer mb; +public: + CBlockedFileIO(IFileIO *_io, size32_t _blockSize) : io(_io), blockSize(_blockSize) + { + buffer = mb.reserveTruncate(blockSize); + } + virtual size32_t read(offset_t pos, size32_t len, void *data) override + { + if (len > blockSize) + return io->read(pos, len, data); + size32_t totalCopied = 0; + byte *dest = (byte *) data; + while (len) + { + offset_t readPos = (pos / blockSize) * blockSize; // NB: could be beyond end of file + if (readPos != lastReadPos) + { + readLen = io->read(readPos, blockSize, buffer); // NB: can be less than blockSize (and 0 if beyodn end of file) + lastReadPos = readPos; + } + size32_t endPos = readPos+readLen; + size32_t copyNow; + if (pos+len <= endPos) // common case hopefully + copyNow = len; + else if (pos < endPos) + copyNow = endPos-pos; + else // nothing to copy + break; + memcpy(dest, ((byte *)buffer) + pos-readPos, copyNow); + len -= copyNow; + pos += copyNow; + dest += copyNow; + totalCopied += copyNow; + } + return totalCopied; + } + virtual offset_t size() override { return io->size(); } + virtual size32_t write(offset_t pos, size32_t len, const void * data) override { throwUnexpected(); } + virtual offset_t appendFile(IFile *file, offset_t pos=0, offset_t len=(offset_t)-1) override { throwUnexpected(); } + virtual void setSize(offset_t size) override { throwUnexpected(); } + virtual void flush() override { throwUnexpected(); } + virtual void close() override { io->close(); } + virtual unsigned __int64 getStatistic(StatisticKind kind) override { return io->getStatistic(kind); } +}; + +extern IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize) +{ + return new CBlockedFileIO(base, blockSize); +} + +// Cache/update plane index blocked IO settings +static unsigned planeBlockIOMapCBId = 0; +static std::unordered_map planeBlockedIOMap; +static CriticalSection planeBlockedIOMapCrit; +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + auto updateFunc = [&](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration) + { + CriticalBlock b(planeBlockedIOMapCrit); + planeBlockedIOMap.clear(); + Owned planesIter = getPlanesIterator(nullptr, nullptr); + ForEach(*planesIter) + { + const IPropertyTree &plane = planesIter->query(); + size32_t blockedFileIOSize = plane.getPropInt("@blockedFileIOKB") * 1024; + planeBlockedIOMap[plane.queryProp("@name")] = blockedFileIOSize; + } + }; + planeBlockIOMapCBId = installConfigUpdateHook(updateFunc, true); + return true; +} + +MODULE_EXIT() +{ + removeConfigUpdateHook(planeBlockIOMapCBId); +} + + +size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize) +{ + CriticalBlock b(planeBlockedIOMapCrit); + auto it = planeBlockedIOMap.find(planeName); + if (it != planeBlockedIOMap.end()) + return it->second; + else + return defaultSize; +} diff --git a/system/jlib/jfile.hpp b/system/jlib/jfile.hpp index 788425bc6f5..f391982c4ce 100644 --- a/system/jlib/jfile.hpp +++ b/system/jlib/jfile.hpp @@ -774,8 +774,14 @@ jlib_decl IFileEventWatcher *createFileEventWatcher(FileWatchFunc callback); //---- Storage plane related functions ---------------------------------------------------- +interface IPropertyTree; +interface IPropertyTreeIterator; extern jlib_decl IPropertyTree * getHostGroup(const char * name, bool required); extern jlib_decl IPropertyTree * getStoragePlane(const char * name); extern jlib_decl IPropertyTree * getRemoteStorage(const char * name); +extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name); + +extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize); +extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize=0); #endif diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index 6616fd01cda..b1f0dc01814 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -9089,10 +9089,14 @@ jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char return loadConfiguration(componentDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute, monitor); } -void replaceComponentConfig(IPropertyTree *newComponentConfig) +void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig) { - CriticalBlock b(configCS); - componentConfiguration.set(newComponentConfig); + { + CriticalBlock b(configCS); + componentConfiguration.set(newComponentConfig); + globalConfiguration.set(newGlobalConfig); + } + executeConfigUpdaterCallbacks(); } class CYAMLBufferReader : public CInterfaceOf diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index 4388003eb4d..ac239602056 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -319,7 +319,7 @@ jlib_decl void mergeConfiguration(IPropertyTree & target, const IPropertyTree & jlib_decl IPropertyTree * loadArgsIntoConfiguration(IPropertyTree *config, const char * * argv, std::initializer_list ignoreOptions = {}); jlib_decl IPropertyTree * loadConfiguration(IPropertyTree * defaultConfig, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute=nullptr, bool monitor=true); jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute=nullptr, bool monitor=true); -jlib_decl void replaceComponentConfig(IPropertyTree *newComponentConfig); +jlib_decl void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig); jlib_decl IPropertyTree * getCostsConfiguration(); //The following can only be called after loadConfiguration has been called. All components must call loadConfiguration(). diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index 4e781d124d8..6ee37516bcd 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -308,7 +308,14 @@ class CIndexReadSlaveBase : public CSlaveActivity // local key handling - lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics)); + size32_t blockedSize = 0; + if (!helper->hasSegmentMonitors()) // unfiltered + { + StringBuffer planeName; + part.queryOwner().getClusterLabel(0, planeName); + blockedSize = getBlockedFileIOSize(planeName); + } + lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize)); RemoteFilename rfn; part.getFilename(0, rfn); diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index 234756587a8..87236f09fbb 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -1227,7 +1227,7 @@ interface IExpander; interface IThorFileCache : extends IInterface { virtual bool remove(const char *filename, unsigned crc) = 0; - virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr, const StatisticsMapping & _statMapping=diskLocalStatistics) = 0; + virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr, const StatisticsMapping & _statMapping=diskLocalStatistics, size32_t blockedFileIOSize=0) = 0; }; class graph_decl CThorResourceBase : implements IThorResource, public CInterface diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index c5e4424552b..16d5d5121cd 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -2044,6 +2044,7 @@ class CLazyFileIO : public CInterfaceOf Owned iFileIO; // real IFileIO CActivityBase *activity = nullptr; StringAttr filename, id; + size32_t blockedFileIOSize = 0; IFileIO *getFileIO() { @@ -2056,9 +2057,15 @@ class CLazyFileIO : public CInterfaceOf return iFileIO.getClear(); } public: - CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander, const StatisticsMapping & _statMapping=diskLocalStatistics) - : cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(_statMapping) + CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander, const StatisticsMapping & _statMapping, size32_t _blockedFileIOSize) + : cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander), + fileStats(_statMapping), blockedFileIOSize(_blockedFileIOSize) { + if (blockedFileIOSize) // enabled + { + if (compressed || expander) + blockedFileIOSize = 0; // ignore. Compressed files use their own blocked format, but may want to revisit this area. + } } virtual void beforeDispose() override; void setActivity(CActivityBase *_activity) @@ -2190,7 +2197,7 @@ class CFileCache : public CSimpleInterfaceOf CriticalBlock b(crit); return _remove(id); } - virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander, const StatisticsMapping & _statMapping) override + virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander, const StatisticsMapping & _statMapping, size32_t blockedFileIOSize) override { StringBuffer filename; RemoteFilename rfn; @@ -2200,13 +2207,15 @@ class CFileCache : public CSimpleInterfaceOf unsigned crc = partDesc.queryProperties().getPropInt("@fileCrc"); if (crc) id.append(crc); + if (blockedFileIOSize) + id.append('_').append(blockedFileIOSize); CriticalBlock b(crit); CLazyFileIO * file = files.find(id); if (!file || !file->isAliveAndLink()) { Owned repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc); bool compressed = partDesc.queryOwner().isCompressed(); - file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander, _statMapping); + file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander, _statMapping, blockedFileIOSize); files.replace(* file); // NB: files does not own 'file', CLazyFileIO will remove itself from cache on destruction /* NB: there will be 1 CLazyFileIO per physical file part name @@ -2246,7 +2255,11 @@ IFileIO *CLazyFileIO::getOpenFileIO(CActivityBase &activity) else if (compressed) iFileIO.setown(createCompressedFileReader(iFile)); else + { iFileIO.setown(iFile->open(IFOread)); + if (blockedFileIOSize) + iFileIO.setown(createBlockedIO(iFileIO.getClear(), blockedFileIOSize)); + } if (!iFileIO.get()) throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get()); } diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index 5df2a826a16..0133c5e6393 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -360,6 +360,7 @@ class CRegistryServer : public CSimpleInterface msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR); processGroup->serialize(msg); globals->serialize(msg); + getGlobalConfigSP()->serialize(msg); msg.append(masterSlaveMpTag); msg.append(kjServiceMpTag); if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND)) diff --git a/thorlcr/slave/thslavemain.cpp b/thorlcr/slave/thslavemain.cpp index e918a35dd6f..a5196cd650a 100644 --- a/thorlcr/slave/thslavemain.cpp +++ b/thorlcr/slave/thslavemain.cpp @@ -113,21 +113,22 @@ static bool RegisterSelf(SocketEndpoint &masterEp) msg.read(vmajor); msg.read(vminor); Owned processGroup = deserializeIGroup(msg); + Owned masterComponentConfig = createPTree(msg); + Owned masterGlobalConfig = createPTree(msg); mySlaveNum = (unsigned)processGroup->rank(queryMyNode()); assertex(NotFound != mySlaveNum); mySlaveNum++; // 1 based; unsigned configSlaveNum = globals->getPropInt("@slavenum", NotFound); - Owned masterComponentConfig = createPTree(msg); if (NotFound == configSlaveNum) globals->setPropInt("@slavenum", mySlaveNum); else assertex(mySlaveNum == configSlaveNum); - Owned mergedGlobals = createPTreeFromIPT(globals); - mergeConfiguration(*mergedGlobals, *masterComponentConfig); - replaceComponentConfig(mergedGlobals); - globals.set(mergedGlobals); + Owned mergedComponentConfig = createPTreeFromIPT(globals); + mergeConfiguration(*mergedComponentConfig, *masterComponentConfig); + replaceComponentConfig(mergedComponentConfig, masterGlobalConfig); + globals.set(mergedComponentConfig); #ifdef _DEBUG unsigned holdSlave = globals->getPropInt("@holdSlave", NotFound);