Skip to content

Commit

Permalink
Merge pull request #17551 from jakesmith/HPCC-28555-indexread-lookahead
Browse files Browse the repository at this point in the history
HPCC-28555 Add blocked reader for unfiltered serial index reading

Reviewed-By: Richard Chapman <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jul 11, 2023
2 parents 8c19186 + 87d12a6 commit 4a0099b
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 28 deletions.
10 changes: 8 additions & 2 deletions common/thorhelper/thorfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ IHThorDiskReadArg * createWorkUnitReadArg(const char * filename, IHThorWorkunitR

#define MAX_FILE_READ_FAIL_COUNT 3

IKeyIndex *openKeyFile(IDistributedFilePart & keyFile)
IKeyIndex *openKeyFile(IDistributedFilePart & keyFile, size32_t blockedIndexIOSize)
{
unsigned failcount = 0;
unsigned numCopies = keyFile.numCopies();
Expand All @@ -128,7 +128,13 @@ IKeyIndex *openKeyFile(IDistributedFilePart & keyFile)
rfn.getPath(remotePath);
unsigned crc = 0;
keyFile.getCrc(crc);
return createKeyIndex(remotePath.str(), crc, false);
Owned<IFile> iFile = createIFile(remotePath.str());
Owned<IFileIO> iFileIO = iFile->open(IFOread);
if (nullptr == iFileIO)
throw makeStringExceptionV(0, "Failed to open index file %s", remotePath.str());
if (blockedIndexIOSize)
iFileIO.setown(createBlockedIO(iFileIO.getClear(), blockedIndexIOSize));
return createKeyIndex(remotePath.str(), crc, *iFileIO, (unsigned) -1, false);
}
}
catch (IException *E)
Expand Down
2 changes: 1 addition & 1 deletion common/thorhelper/thorfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ interface IKeyIndex;
THORHELPER_API bool checkIndexMetaInformation(IDistributedFile * file, bool force);
THORHELPER_API bool calculateDerivedIndexInformation(DerivedIndexInformation & result, IDistributedFile * file, bool force);
THORHELPER_API void mergeDerivedInformation(DerivedIndexInformation & result, const DerivedIndexInformation & other);
THORHELPER_API IKeyIndex *openKeyFile(IDistributedFilePart & keyFile);
THORHELPER_API IKeyIndex *openKeyFile(IDistributedFilePart & keyFile, size32_t blockedIndexFileIOSize=0);


#endif
12 changes: 3 additions & 9 deletions dali/base/dautils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,17 @@ void getPlaneHosts(StringArray &hosts, IPropertyTree *plane)
}
}

constexpr const char * lz_plane_path = "storage/planes[@category='lz']";

IPropertyTreeIterator * getDropZonePlanesIterator(const char * name)
{
StringBuffer xpath(lz_plane_path);
if (!isEmptyString(name))
xpath.appendf("[@name='%s']", name);
return getGlobalConfigSP()->getElements(xpath);
return getPlanesIterator("lz", name);
}

IPropertyTree * getDropZonePlane(const char * name)
{
if (isEmptyString(name))
throw makeStringException(-1, "Drop zone name required");
StringBuffer xpath(lz_plane_path);
xpath.appendf("[@name='%s']", name);
return getGlobalConfigSP()->getPropTree(xpath);
Owned<IPropertyTreeIterator> iter = getDropZonePlanesIterator(name);
return iter->first() ? &iter->get() : nullptr;
}

IPropertyTree * findPlane(const char *category, const char * path, const char * host, bool ipMatch, bool mustMatch)
Expand Down
10 changes: 9 additions & 1 deletion ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,15 @@ void CHThorIndexReadActivityBase::killPart()

bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart)
{
keyIndex.setown(openKeyFile(df->queryPart(whichPart)));
IDistributedFilePart &part = df->queryPart(whichPart);
size32_t blockedSize = 0;
if (!helper.hasSegmentMonitors()) // unfiltered
{
StringBuffer planeName;
df->getClusterName(part.copyClusterNum(0), planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
keyIndex.setown(openKeyFile(part, blockedSize));
if(df->numParts() == 1)
verifyIndex(keyIndex);
initPart();
Expand Down
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,11 @@
"waitForMount": {
"type": "boolean"
},
"blockedFileIOKB": {
"description": "Optimal block size for efficient reading from this plane. Implementations will use if they can",
"type": "integer",
"default": 0
},
"components": {},
"prefix": {},
"subPath": {},
Expand Down
108 changes: 108 additions & 0 deletions system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7568,6 +7568,16 @@ IPropertyTree * getRemoteStorage(const char * name)
return global->getPropTree(xpath);
}

IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name)
{
StringBuffer xpath("storage/planes");
if (!isEmptyString(category))
xpath.appendf("[@category='%s']", category);
if (!isEmptyString(name))
xpath.appendf("[@name='%s']", name);
return getGlobalConfigSP()->getElements(xpath);
}

IAPICopyClient * createApiCopyClient(IStorageApiInfo * source, IStorageApiInfo * target)
{
ReadLockBlock block(containedFileHookLock);
Expand All @@ -7579,3 +7589,101 @@ IAPICopyClient * createApiCopyClient(IStorageApiInfo * source, IStorageApiInfo *
}
return nullptr;
}


// NB: This implementation is not thread-safe.
// Therefore it should only be used by use cases that are single threaded
class CBlockedFileIO : public CSimpleInterfaceOf<IFileIO>
{
Owned<IFileIO> io;
size32_t blockSize = 0;
size32_t readLen = 0;
void *buffer = nullptr;
offset_t lastReadPos = (offset_t)-1;
MemoryBuffer mb;
public:
CBlockedFileIO(IFileIO *_io, size32_t _blockSize) : io(_io), blockSize(_blockSize)
{
buffer = mb.reserveTruncate(blockSize);
}
virtual size32_t read(offset_t pos, size32_t len, void *data) override
{
if (len > blockSize)
return io->read(pos, len, data);
size32_t totalCopied = 0;
byte *dest = (byte *) data;
while (len)
{
offset_t readPos = (pos / blockSize) * blockSize; // NB: could be beyond end of file
if (readPos != lastReadPos)
{
readLen = io->read(readPos, blockSize, buffer); // NB: can be less than blockSize (and 0 if beyodn end of file)
lastReadPos = readPos;
}
size32_t endPos = readPos+readLen;
size32_t copyNow;
if (pos+len <= endPos) // common case hopefully
copyNow = len;
else if (pos < endPos)
copyNow = endPos-pos;
else // nothing to copy
break;
memcpy(dest, ((byte *)buffer) + pos-readPos, copyNow);
len -= copyNow;
pos += copyNow;
dest += copyNow;
totalCopied += copyNow;
}
return totalCopied;
}
virtual offset_t size() override { return io->size(); }
virtual size32_t write(offset_t pos, size32_t len, const void * data) override { throwUnexpected(); }
virtual offset_t appendFile(IFile *file, offset_t pos=0, offset_t len=(offset_t)-1) override { throwUnexpected(); }
virtual void setSize(offset_t size) override { throwUnexpected(); }
virtual void flush() override { throwUnexpected(); }
virtual void close() override { io->close(); }
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return io->getStatistic(kind); }
};

extern IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize)
{
return new CBlockedFileIO(base, blockSize);
}

// Cache/update plane index blocked IO settings
static unsigned planeBlockIOMapCBId = 0;
static std::unordered_map<std::string, size32_t> planeBlockedIOMap;
static CriticalSection planeBlockedIOMapCrit;
MODULE_INIT(INIT_PRIORITY_STANDARD)
{
auto updateFunc = [&](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
{
CriticalBlock b(planeBlockedIOMapCrit);
planeBlockedIOMap.clear();
Owned<IPropertyTreeIterator> planesIter = getPlanesIterator(nullptr, nullptr);
ForEach(*planesIter)
{
const IPropertyTree &plane = planesIter->query();
size32_t blockedFileIOSize = plane.getPropInt("@blockedFileIOKB") * 1024;
planeBlockedIOMap[plane.queryProp("@name")] = blockedFileIOSize;
}
};
planeBlockIOMapCBId = installConfigUpdateHook(updateFunc, true);
return true;
}

MODULE_EXIT()
{
removeConfigUpdateHook(planeBlockIOMapCBId);
}


size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize)
{
CriticalBlock b(planeBlockedIOMapCrit);
auto it = planeBlockedIOMap.find(planeName);
if (it != planeBlockedIOMap.end())
return it->second;
else
return defaultSize;
}
6 changes: 6 additions & 0 deletions system/jlib/jfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,14 @@ jlib_decl IFileEventWatcher *createFileEventWatcher(FileWatchFunc callback);

//---- Storage plane related functions ----------------------------------------------------

interface IPropertyTree;
interface IPropertyTreeIterator;
extern jlib_decl IPropertyTree * getHostGroup(const char * name, bool required);
extern jlib_decl IPropertyTree * getStoragePlane(const char * name);
extern jlib_decl IPropertyTree * getRemoteStorage(const char * name);
extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name);

extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize);
extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize=0);

#endif
10 changes: 7 additions & 3 deletions system/jlib/jptree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9089,10 +9089,14 @@ jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char
return loadConfiguration(componentDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute, monitor);
}

void replaceComponentConfig(IPropertyTree *newComponentConfig)
void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig)
{
CriticalBlock b(configCS);
componentConfiguration.set(newComponentConfig);
{
CriticalBlock b(configCS);
componentConfiguration.set(newComponentConfig);
globalConfiguration.set(newGlobalConfig);
}
executeConfigUpdaterCallbacks();
}

class CYAMLBufferReader : public CInterfaceOf<IPTreeReader>
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jptree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ jlib_decl void mergeConfiguration(IPropertyTree & target, const IPropertyTree &
jlib_decl IPropertyTree * loadArgsIntoConfiguration(IPropertyTree *config, const char * * argv, std::initializer_list<const std::string> ignoreOptions = {});
jlib_decl IPropertyTree * loadConfiguration(IPropertyTree * defaultConfig, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute=nullptr, bool monitor=true);
jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute=nullptr, bool monitor=true);
jlib_decl void replaceComponentConfig(IPropertyTree *newComponentConfig);
jlib_decl void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig);
jlib_decl IPropertyTree * getCostsConfiguration();

//The following can only be called after loadConfiguration has been called. All components must call loadConfiguration().
Expand Down
9 changes: 8 additions & 1 deletion thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,14 @@ class CIndexReadSlaveBase : public CSlaveActivity

// local key handling

lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics));
size32_t blockedSize = 0;
if (!helper->hasSegmentMonitors()) // unfiltered
{
StringBuffer planeName;
part.queryOwner().getClusterLabel(0, planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));

RemoteFilename rfn;
part.getFilename(0, rfn);
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/graph/thgraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ interface IExpander;
interface IThorFileCache : extends IInterface
{
virtual bool remove(const char *filename, unsigned crc) = 0;
virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr, const StatisticsMapping & _statMapping=diskLocalStatistics) = 0;
virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilenae, IPartDescriptor &partDesc, IExpander *expander=nullptr, const StatisticsMapping & _statMapping=diskLocalStatistics, size32_t blockedFileIOSize=0) = 0;
};

class graph_decl CThorResourceBase : implements IThorResource, public CInterface
Expand Down
21 changes: 17 additions & 4 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,7 @@ class CLazyFileIO : public CInterfaceOf<IFileIO>
Owned<IFileIO> iFileIO; // real IFileIO
CActivityBase *activity = nullptr;
StringAttr filename, id;
size32_t blockedFileIOSize = 0;

IFileIO *getFileIO()
{
Expand All @@ -2056,9 +2057,15 @@ class CLazyFileIO : public CInterfaceOf<IFileIO>
return iFileIO.getClear();
}
public:
CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander, const StatisticsMapping & _statMapping=diskLocalStatistics)
: cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander), fileStats(_statMapping)
CLazyFileIO(CFileCache &_cache, const char *_filename, const char *_id, IActivityReplicatedFile *_repFile, bool _compressed, IExpander *_expander, const StatisticsMapping & _statMapping, size32_t _blockedFileIOSize)
: cache(_cache), filename(_filename), id(_id), repFile(_repFile), compressed(_compressed), expander(_expander),
fileStats(_statMapping), blockedFileIOSize(_blockedFileIOSize)
{
if (blockedFileIOSize) // enabled
{
if (compressed || expander)
blockedFileIOSize = 0; // ignore. Compressed files use their own blocked format, but may want to revisit this area.
}
}
virtual void beforeDispose() override;
void setActivity(CActivityBase *_activity)
Expand Down Expand Up @@ -2190,7 +2197,7 @@ class CFileCache : public CSimpleInterfaceOf<IThorFileCache>
CriticalBlock b(crit);
return _remove(id);
}
virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander, const StatisticsMapping & _statMapping) override
virtual IFileIO *lookupIFileIO(CActivityBase &activity, const char *logicalFilename, IPartDescriptor &partDesc, IExpander *expander, const StatisticsMapping & _statMapping, size32_t blockedFileIOSize) override
{
StringBuffer filename;
RemoteFilename rfn;
Expand All @@ -2200,13 +2207,15 @@ class CFileCache : public CSimpleInterfaceOf<IThorFileCache>
unsigned crc = partDesc.queryProperties().getPropInt("@fileCrc");
if (crc)
id.append(crc);
if (blockedFileIOSize)
id.append('_').append(blockedFileIOSize);
CriticalBlock b(crit);
CLazyFileIO * file = files.find(id);
if (!file || !file->isAliveAndLink())
{
Owned<IActivityReplicatedFile> repFile = createEnsurePrimaryPartFile(logicalFilename, &partDesc);
bool compressed = partDesc.queryOwner().isCompressed();
file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander, _statMapping);
file = new CLazyFileIO(*this, filename, id, repFile.getClear(), compressed, expander, _statMapping, blockedFileIOSize);
files.replace(* file); // NB: files does not own 'file', CLazyFileIO will remove itself from cache on destruction

/* NB: there will be 1 CLazyFileIO per physical file part name
Expand Down Expand Up @@ -2246,7 +2255,11 @@ IFileIO *CLazyFileIO::getOpenFileIO(CActivityBase &activity)
else if (compressed)
iFileIO.setown(createCompressedFileReader(iFile));
else
{
iFileIO.setown(iFile->open(IFOread));
if (blockedFileIOSize)
iFileIO.setown(createBlockedIO(iFileIO.getClear(), blockedFileIOSize));
}
if (!iFileIO.get())
throw MakeThorException(0, "CLazyFileIO: failed to open: %s", filename.get());
}
Expand Down
1 change: 1 addition & 0 deletions thorlcr/master/thmastermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class CRegistryServer : public CSimpleInterface
msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR);
processGroup->serialize(msg);
globals->serialize(msg);
getGlobalConfigSP()->serialize(msg);
msg.append(masterSlaveMpTag);
msg.append(kjServiceMpTag);
if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
Expand Down
11 changes: 6 additions & 5 deletions thorlcr/slave/thslavemain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,22 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
msg.read(vmajor);
msg.read(vminor);
Owned<IGroup> processGroup = deserializeIGroup(msg);
Owned<IPropertyTree> masterComponentConfig = createPTree(msg);
Owned<IPropertyTree> masterGlobalConfig = createPTree(msg);
mySlaveNum = (unsigned)processGroup->rank(queryMyNode());
assertex(NotFound != mySlaveNum);
mySlaveNum++; // 1 based;

unsigned configSlaveNum = globals->getPropInt("@slavenum", NotFound);
Owned<IPropertyTree> masterComponentConfig = createPTree(msg);
if (NotFound == configSlaveNum)
globals->setPropInt("@slavenum", mySlaveNum);
else
assertex(mySlaveNum == configSlaveNum);

Owned<IPropertyTree> mergedGlobals = createPTreeFromIPT(globals);
mergeConfiguration(*mergedGlobals, *masterComponentConfig);
replaceComponentConfig(mergedGlobals);
globals.set(mergedGlobals);
Owned<IPropertyTree> mergedComponentConfig = createPTreeFromIPT(globals);
mergeConfiguration(*mergedComponentConfig, *masterComponentConfig);
replaceComponentConfig(mergedComponentConfig, masterGlobalConfig);
globals.set(mergedComponentConfig);

#ifdef _DEBUG
unsigned holdSlave = globals->getPropInt("@holdSlave", NotFound);
Expand Down

0 comments on commit 4a0099b

Please sign in to comment.