Skip to content

Commit

Permalink
HPCC-28555 Add caching, restrict to unfiltered, and simplify
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jul 6, 2023
1 parent dc42ba7 commit 9a00180
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 63 deletions.
12 changes: 3 additions & 9 deletions dali/base/dautils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,17 @@ void getPlaneHosts(StringArray &hosts, IPropertyTree *plane)
}
}

constexpr const char * lz_plane_path = "storage/planes[@category='lz']";

IPropertyTreeIterator * getDropZonePlanesIterator(const char * name)
{
StringBuffer xpath(lz_plane_path);
if (!isEmptyString(name))
xpath.appendf("[@name='%s']", name);
return getGlobalConfigSP()->getElements(xpath);
return getPlanesIterator("lz", name);
}

IPropertyTree * getDropZonePlane(const char * name)
{
if (isEmptyString(name))
throw makeStringException(-1, "Drop zone name required");
StringBuffer xpath(lz_plane_path);
xpath.appendf("[@name='%s']", name);
return getGlobalConfigSP()->getPropTree(xpath);
Owned<IPropertyTreeIterator> iter = getDropZonePlanesIterator(name);
return iter->first() ? &iter->get() : nullptr;
}

IPropertyTree * findPlane(const char *category, const char * path, const char * host, bool ipMatch, bool mustMatch)
Expand Down
16 changes: 7 additions & 9 deletions ecl/hthor/hthorkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ class CHThorIndexReadActivityBase : public CHThorActivityBase
bool singlePart = false; // a single part index, not part of a super file - optimize so never reload the part.
bool localSortKey = false;
bool initializedFileInfo = false;
int configUseBlockedIndexIO = -1; // tri-state (-1 = not set)
size32_t configBlockedIndexIOSize = 0;

//for layout translation
Owned<const IDynamicTransform> layoutTrans;
Expand Down Expand Up @@ -326,10 +324,6 @@ CHThorIndexReadActivityBase::CHThorIndexReadActivityBase(IAgentContext &_agent,
if (recordTranslationModeHintText)
recordTranslationModeHint = getTranslationMode(recordTranslationModeHintText, true);
}
// can be defined as runtime option, but normally defined at plane level
// see call to getBlockedIndexIOSize
configUseBlockedIndexIO = agent.queryWorkUnit()->getDebugValueInt("useBlockedIndexIO", -1);
configBlockedIndexIOSize = agent.queryWorkUnit()->getDebugValueInt("blockedIndexIOKB", 0) * 1024;
}

CHThorIndexReadActivityBase::~CHThorIndexReadActivityBase()
Expand Down Expand Up @@ -601,9 +595,13 @@ void CHThorIndexReadActivityBase::killPart()
bool CHThorIndexReadActivityBase::setCurrentPart(unsigned whichPart)
{
IDistributedFilePart &part = df->queryPart(whichPart);
StringBuffer planeName;
df->getClusterName(part.copyClusterNum(0), planeName);
size32_t blockedSize = getBlockedIndexIOSize(planeName, configUseBlockedIndexIO, configBlockedIndexIOSize);
size32_t blockedSize = 0;
if (!helper.hasSegmentMonitors()) // unfiltered
{
StringBuffer planeName;
df->getClusterName(part.copyClusterNum(0), planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
keyIndex.setown(openKeyFile(part, blockedSize));
if(df->numParts() == 1)
verifyIndex(keyIndex);
Expand Down
12 changes: 4 additions & 8 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,10 @@
"waitForMount": {
"type": "boolean"
},
"useBlockedIndexIO": {
"description": "Use blocked IO read for serial index reads",
"type": "boolean",
"default": false
},
"blockedIndexIOKB": {
"description": "Block size (KB) to use for serial index reading when useBlockedIndexIO is on",
"type": "integer"
"blockedFileIOKB": {
"description": "Optimal block size for efficient reading from this plane. Implementations will use if they can",
"type": "integer",
"default": 0
},
"components": {},
"prefix": {},
Expand Down
66 changes: 43 additions & 23 deletions system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7568,6 +7568,16 @@ IPropertyTree * getRemoteStorage(const char * name)
return global->getPropTree(xpath);
}

IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name)
{
StringBuffer xpath("storage/planes");
if (!isEmptyString(category))
xpath.appendf("[@category='%s']", category);
if (!isEmptyString(name))
xpath.appendf("[@name='%s']", name);
return getGlobalConfigSP()->getElements(xpath);
}

IAPICopyClient * createApiCopyClient(IStorageApiInfo * source, IStorageApiInfo * target)
{
ReadLockBlock block(containedFileHookLock);
Expand Down Expand Up @@ -7639,34 +7649,44 @@ class CBlockedFileIO : public CSimpleInterfaceOf<IFileIO>

extern IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize)
{
PROGLOG("createBlockedIO(%u)", (unsigned)blockSize);
PROGLOG("createBlockedIO - blockSize = %u", blockSize);
return new CBlockedFileIO(base, blockSize);
}

static constexpr size32_t defaultBlockedIndexIOKB = 1024;
size32_t getBlockedIndexIOSize(const char *planeName, int configUseBlockedIndexIO, size32_t configBlockedIndeIOSize)
// Cache/update plane index blocked IO settings
static unsigned planeBlockIOMapCBId = 0;
static std::unordered_map<std::string, size32_t> planeBlockedIOMap;
static CriticalSection planeBlockedIOMapCrit;
MODULE_INIT(INIT_PRIORITY_STANDARD)
{
size32_t blockedSize = 0; // off
if (0 != configUseBlockedIndexIO)
auto updateFunc = [&](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
{
if (1 == configUseBlockedIndexIO)
blockedSize = configBlockedIndeIOSize;
if (0 == blockedSize)
CriticalBlock b(planeBlockedIOMapCrit);
planeBlockedIOMap.clear();
Owned<IPropertyTreeIterator> planesIter = getPlanesIterator(nullptr, nullptr);
ForEach(*planesIter)
{
// could cache, but not sure it's worth it
int useBlockedIndexIO = configUseBlockedIndexIO;
Owned<IPropertyTree> plane = getStoragePlane(planeName);
if (plane)
{
if ((1 == configUseBlockedIndexIO) || plane->getPropBool("@useBlockedIndexIO"))
{
useBlockedIndexIO = 1;
blockedSize = plane->getPropInt("@blockedIndexIOKB") * 1024;
}
}
if ((1 == useBlockedIndexIO) && (0 == blockedSize))
blockedSize = defaultBlockedIndexIOKB * 1024;
const IPropertyTree &plane = planesIter->query();
size32_t blockedFileIOSize = plane.getPropInt("@blockedFileIOKB") * 1024;
planeBlockedIOMap[plane.queryProp("@name")] = blockedFileIOSize;
}
}
return blockedSize;
};
planeBlockIOMapCBId = installConfigUpdateHook(updateFunc, true);
return true;
}

MODULE_EXIT()
{
removeConfigUpdateHook(planeBlockIOMapCBId);
}


size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize)
{
CriticalBlock b(planeBlockedIOMapCrit);
auto it = planeBlockedIOMap.find(planeName);
if (it != planeBlockedIOMap.end())
return it->second;
else
return defaultSize;
}
8 changes: 5 additions & 3 deletions system/jlib/jfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,12 +774,14 @@ jlib_decl IFileEventWatcher *createFileEventWatcher(FileWatchFunc callback);

//---- Storage plane related functions ----------------------------------------------------

interface IPropertyTree;
interface IPropertyTreeIterator;
extern jlib_decl IPropertyTree * getHostGroup(const char * name, bool required);
extern jlib_decl IPropertyTree * getStoragePlane(const char * name);
extern jlib_decl IPropertyTree * getRemoteStorage(const char * name);
extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name);

constexpr size32_t defaultBlockFileIOSize = 0x100000; // 1MB
extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize=defaultBlockFileIOSize);
extern jlib_decl size32_t getBlockedIndexIOSize(const char *planeName, int configUseBlockedIndexIO, size32_t configBlockedIndeIOSize);
extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize);
extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t defaultSize=0);

#endif
2 changes: 1 addition & 1 deletion system/jlib/jptree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8878,7 +8878,7 @@ void executeConfigUpdaterCallbacks()
{
if (!configFileUpdater) // NB: executeConfigUpdaterCallbacks should always be called after configFileUpdater is initialized
return;
configFileUpdater->executeCallbacks(componentConfiguration.getLink(), globalConfiguration.getLink());
configFileUpdater->executeCallbacks(componentConfiguration, globalConfiguration);
}

void CConfigUpdateHook::clear()
Expand Down
17 changes: 7 additions & 10 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
Owned<IFileIO> lazyIFileIO;
mutable CriticalSection ioStatsCS;
unsigned fileTableStart = NotFound;
int configUseBlockedIndexIO = -1; // tri-state (-1 = not set)
size32_t configBlockedIndexIOSize = 0;

template<class StatProvider>
class CCaptureIndexStats
Expand Down Expand Up @@ -310,9 +308,13 @@ class CIndexReadSlaveBase : public CSlaveActivity

// local key handling

StringBuffer planeName;
part.queryOwner().getClusterLabel(0, planeName);
size32_t blockedSize = getBlockedIndexIOSize(planeName, configUseBlockedIndexIO, configBlockedIndexIOSize);
size32_t blockedSize = 0;
if (!helper->hasSegmentMonitors()) // unfiltered
{
StringBuffer planeName;
part.queryOwner().getClusterLabel(0, planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));

RemoteFilename rfn;
Expand Down Expand Up @@ -557,11 +559,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
serializer.set(queryRowSerializer());
helper->setCallback(&callback);
reInit = 0 != (helper->getFlags() & (TIRvarfilename|TIRdynamicfilename));

// can be defined as runtime option, but normally defined at plane level
// see call to getBlockedIndexIOSize
configUseBlockedIndexIO = getOptInt("useBlockedIndexIO", -1);
configBlockedIndexIOSize = getOptInt("blockedIndexIOKB") * 1024;
}
rowcount_t getLocalCount(const rowcount_t keyedLimit, bool hard)
{
Expand Down

0 comments on commit 9a00180

Please sign in to comment.