diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index fd2959f3a3a..2378dc4ae8b 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -278,10 +278,10 @@ class ActivityTimer ActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) : accumulator(_accumulator), enabled(_enabled), isFirstRow(false) { - if (enabled) + if (likely(enabled)) { startCycles = get_cycles_now(); - if (!accumulator.firstRow) + if (unlikely(!accumulator.firstRow)) { isFirstRow = true; accumulator.startCycles = startCycles; @@ -294,13 +294,13 @@ class ActivityTimer ~ActivityTimer() { - if (enabled) + if (likely(enabled)) { cycle_t nowCycles = get_cycles_now(); accumulator.endCycles = nowCycles; cycle_t elapsedCycles = nowCycles - startCycles; accumulator.totalCycles += elapsedCycles; - if (isFirstRow) + if (unlikely(isFirstRow)) accumulator.firstExitCycles = nowCycles; } } @@ -316,7 +316,7 @@ class SimpleActivityTimer inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled) : accumulator(_accumulator), enabled(_enabled) { - if (enabled) + if (likely(enabled)) startCycles = get_cycles_now(); else startCycles = 0; @@ -324,7 +324,7 @@ class SimpleActivityTimer inline ~SimpleActivityTimer() { - if (enabled) + if (likely(enabled)) { cycle_t nowCycles = get_cycles_now(); cycle_t elapsedCycles = nowCycles - startCycles; diff --git a/common/thorhelper/thorfile.cpp b/common/thorhelper/thorfile.cpp index 4d7f8cc6764..d27e6282dc1 100644 --- a/common/thorhelper/thorfile.cpp +++ b/common/thorhelper/thorfile.cpp @@ -242,6 +242,8 @@ static void gatherDerivedIndexInformation(DerivedIndexInformation & result, IDis result.sizeDiskLeaves = result.numLeafNodes * nodeSize; result.sizeDiskBlobs = result.numBlobNodes * nodeSize; result.sizeDiskBranches = result.numBranchNodes * nodeSize; + result.sizeMemoryBranches = attrs.getPropInt64("@branchMemorySize"); + result.sizeMemoryLeaves = attrs.getPropInt64("@leafMemorySize"); } else { @@ -283,11 +285,12 @@ static void gatherDerivedIndexInformation(DerivedIndexInformation & result, IDis result.sizeOriginalData = attrs.getPropInt64("@uncompressedSize"); //The following will depend on the compression format - e.g. if compressed searching is implemented - result.sizeMemoryBranches = result.sizeOriginalBranches; + if (result.sizeMemoryBranches == 0) + result.sizeMemoryBranches = result.sizeOriginalBranches; //NOTE: sizeOriginalData now includes the blob sizes that are removed before passing to the builder // if the original blob size is recorded then use it, otherwise estimate it - if (result.sizeOriginalData) + if (result.sizeOriginalData && (result.sizeMemoryLeaves == 0)) { offset_t originalBlobSize = attrs.getPropInt64("@originalBlobSize"); if (result.numBlobNodes == 0) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 154505e449c..b86fc1e70ac 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2696,6 +2696,45 @@ cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) return totalCost; } +void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) +{ + WuScopeFilter filter; + if (!isEmptyString(scope)) + filter.addScope(scope); + else + { + filter.addScope(""); + filter.addSource("global"); + } + filter.setIncludeNesting(1); + filter.addOutputStatistic(StSizeGraphSpill); + filter.addRequiredStat(StSizeGraphSpill); + filter.finishedFilter(); + Owned it = &wu->getScopeIterator(filter); + peakSizeSpill = 0; + for (it->first(); it->isValid(); ) + { + stat_type value = 0; + if (it->getStat(StSizeGraphSpill, value)) + { + if (value>peakSizeSpill) + peakSizeSpill = value; + it->nextSibling(); + } + else + { + it->next(); + } + } +} + +void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) +{ + stat_type peakSizeSpill = 0; + gatherSpillSize(wu, scope, peakSizeSpill); + if (peakSizeSpill) + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); +} //--------------------------------------------------------------------------------------------------------------------- diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index adb79dd614e..73f4921bef0 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1722,6 +1722,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeTyp extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); +extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 6d513f88983..0b0610c3adb 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1985,7 +1985,7 @@ void EclAgent::doProcess() const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr); if (diskAccessCost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - + updateSpillSize(w, nullptr, SSTglobal); addTimings(w); switch (w->getState()) @@ -2534,6 +2534,7 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope); if (diskAccessCost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); + updateSpillSize(wu, scope, SSTworkflow); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index dc656bcde9a..684be296c12 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -1161,6 +1161,8 @@ void CHThorIndexWriteActivity::execute() unsigned __int64 numBlobNodes = 0; unsigned __int64 numBranchNodes = 0; offset_t originalBlobSize = 0; + offset_t branchMemorySize = 0; + offset_t leafMemorySize = 0; unsigned nodeSize = 0; file.setown(createIFile(filename.get())); @@ -1260,6 +1262,8 @@ void CHThorIndexWriteActivity::execute() numBranchNodes = builder->getNumBranchNodes(); numBlobNodes = builder->getNumBlobNodes(); originalBlobSize = bc.queryTotalSize(); + branchMemorySize = builder->getBranchMemorySize(); + leafMemorySize = builder->getLeafMemorySize(); totalLeafNodes += numLeafNodes; totalBranchNodes += numBranchNodes; @@ -1346,6 +1350,10 @@ void CHThorIndexWriteActivity::execute() properties.setPropInt64("@numBlobNodes", numBlobNodes); if (numBlobNodes) properties.setPropInt64("@originalBlobSize", originalBlobSize); + if (branchMemorySize) + properties.setPropInt64("@branchMemorySize", branchMemorySize); + if (leafMemorySize) + properties.setPropInt64("@leafMemorySize", leafMemorySize); size32_t keyedSize = helper.getKeyedSize(); if (keyedSize == (size32_t)-1) diff --git a/esp/services/ws_access/ws_accessService.cpp b/esp/services/ws_access/ws_accessService.cpp index 56c1d1ae63a..50f56575ad4 100644 --- a/esp/services/ws_access/ws_accessService.cpp +++ b/esp/services/ws_access/ws_accessService.cpp @@ -1768,11 +1768,6 @@ bool Cws_accessEx::onResources(IEspContext &context, IEspResourcesRequest &req, oneresource->setIsSpecial(true); } } - else if(rtype == RT_FILE_SCOPE && stricmp(rname, "file") == 0) - { - //oneresource->setIsSpecial(true); //33067 - continue; - } oneresource->setName(rname); oneresource->setDescription(r.getDescription()); diff --git a/esp/src/eclwatch/ClusterProcessesQueryWidget.js b/esp/src/eclwatch/ClusterProcessesQueryWidget.js index cab88c0c761..b3a388288d8 100644 --- a/esp/src/eclwatch/ClusterProcessesQueryWidget.js +++ b/esp/src/eclwatch/ClusterProcessesQueryWidget.js @@ -80,10 +80,14 @@ define([ initTab: function () { var currSel = this.getSelectedChild(); if (currSel && !currSel.initalized) { - if (currSel.id === this.legacyClustersProcessesIframeWidget.id && !this.legacyClustersProcessesIframeWidget.initalized) { + if (currSel.id === this.id + "_Grid") { + this.refreshGrid(); + } else if (currSel.id === this.legacyClustersProcessesIframeWidget.id && !this.legacyClustersProcessesIframeWidget.initalized) { this.legacyClustersProcessesIframeWidget.init({ src: ESPRequest.getBaseURL("WsTopology") + "/TpClusterQuery?Type=ROOT" }); + } else { + currSel.init(currSel.params); } } }, diff --git a/esp/src/eclwatch/SystemServersQueryWidget.js b/esp/src/eclwatch/SystemServersQueryWidget.js index 7622fd535e1..12a7e9c7546 100644 --- a/esp/src/eclwatch/SystemServersQueryWidget.js +++ b/esp/src/eclwatch/SystemServersQueryWidget.js @@ -68,10 +68,14 @@ define([ initTab: function () { var currSel = this.getSelectedChild(); if (currSel && !currSel.initalized) { - if (currSel.id === this.systemServersQueryWidgetIframeWidget.id && !this.systemServersQueryWidgetIframeWidget.initalized) { + if (currSel.id === this.id + "_Grid") { + this.refreshGrid(); + } else if (currSel.id === this.systemServersQueryWidgetIframeWidget.id && !this.systemServersQueryWidgetIframeWidget.initalized) { this.systemServersQueryWidgetIframeWidget.init({ src: ESPRequest.getBaseURL("WsTopology") + "/TpServiceQuery?Type=ALLSERVICES" }); + } else { + currSel.init(currSel.params); } } }, @@ -278,7 +282,7 @@ define([ retVal.on(".dgrid-cell .gridClick:click", function (evt) { var item = retVal.row(evt).data; if (evt.target.title === "Audit Log" || evt.target.title === "Component Log") { - context._onOpenLog(item); + context._onOpenLog(item, evt.target.title); } else { context._onOpenConfiguration(item); } @@ -349,11 +353,11 @@ define([ }); }, - _onOpenLog: function (item) { + _onOpenLog: function (item, type) { var nodeTab = this.ensureLogsPane(item.Name + ": " + item.Parent.LogDirectory, { params: item, ParentName: item.Parent.Name, - LogDirectory: item.Parent.LogDirectory, + LogDirectory: type === "Audit Log" ? item.Parent.AuditLogDirectory : item.Parent.LogDirectory, NetAddress: item.Netaddress, OS: item.OS, newPreflight: true diff --git a/esp/src/eclwatch/TargetClustersQueryWidget.js b/esp/src/eclwatch/TargetClustersQueryWidget.js index 5b470f3ae51..5487b39ad5c 100644 --- a/esp/src/eclwatch/TargetClustersQueryWidget.js +++ b/esp/src/eclwatch/TargetClustersQueryWidget.js @@ -56,10 +56,14 @@ define([ initTab: function () { var currSel = this.getSelectedChild(); if (currSel && !currSel.initalized) { - if (currSel.id === this.legacyTargetClustersIframeWidget.id && !this.legacyTargetClustersIframeWidget.initalized) { + if (currSel.id === this.id + "_Grid") { + this.refreshGrid(); + } else if (currSel.id === this.legacyTargetClustersIframeWidget.id && !this.legacyTargetClustersIframeWidget.initalized) { this.legacyTargetClustersIframeWidget.init({ src: ESPRequest.getBaseURL("WsTopology") + "/TpTargetClusterQuery?Type=ROOT" }); + } else if (currSel.params.newPreflight || currSel.params.Usergenerated) { //prevents loop of pfTab.init above + currSel.init(currSel.params); } } }, diff --git a/esp/src/src-react/components/Files.tsx b/esp/src/src-react/components/Files.tsx index fc31ce81bb7..45aaa89e3c5 100644 --- a/esp/src/src-react/components/Files.tsx +++ b/esp/src/src-react/components/Files.tsx @@ -1,5 +1,6 @@ import * as React from "react"; import { CommandBar, ContextualMenuItemType, ICommandBarItemProps, Icon, Link } from "@fluentui/react"; +import { scopedLogger } from "@hpcc-js/util"; import * as WsDfu from "src/WsDfu"; import { CreateDFUQueryStore } from "src/ESPLogicalFile"; import { formatCost } from "src/Session"; @@ -21,6 +22,8 @@ import { RenameFile } from "./forms/RenameFile"; import { ShortVerticalDivider } from "./Common"; import { SizeMe } from "react-sizeme"; +const logger = scopedLogger("src-react/components/Files.tsx"); + const FilterFields: Fields = { "LogicalName": { type: "string", label: nlsHPCC.Name, placeholder: nlsHPCC.somefile }, "Description": { type: "string", label: nlsHPCC.Description, placeholder: nlsHPCC.SomeDescription }, @@ -218,7 +221,13 @@ export const Files: React.FunctionComponent = ({ message: nlsHPCC.DeleteSelectedFiles, items: selection.map(s => s.Name), onSubmit: React.useCallback(() => { - WsDfu.DFUArrayAction(selection, "Delete").then(() => refreshTable(true)); + WsDfu.DFUArrayAction(selection, "Delete") + .then(({ DFUArrayActionResponse }) => { + const ActionResults = DFUArrayActionResponse?.ActionResults?.DFUActionInfo ?? []; + ActionResults.filter(action => action?.Failed).forEach(action => logger.error(action?.ActionResult)); + refreshTable(true); + }) + .catch(err => logger.error(err)); }, [refreshTable, selection]) }); diff --git a/esp/src/src-react/components/LogicalFileSummary.tsx b/esp/src/src-react/components/LogicalFileSummary.tsx index 4201e1bdbd2..0416e253424 100644 --- a/esp/src/src-react/components/LogicalFileSummary.tsx +++ b/esp/src/src-react/components/LogicalFileSummary.tsx @@ -59,6 +59,8 @@ export const LogicalFileSummary: React.FunctionComponent logger.error(err)); }, [file]) diff --git a/esp/src/src-react/components/SuperFileSummary.tsx b/esp/src/src-react/components/SuperFileSummary.tsx index 69d0cb6f5b8..ba5e16172e1 100644 --- a/esp/src/src-react/components/SuperFileSummary.tsx +++ b/esp/src/src-react/components/SuperFileSummary.tsx @@ -46,7 +46,7 @@ export const SuperFileSummary: React.FunctionComponent = action: "remove", superfile: file.Name, subfiles: { Item: subfiles.map(file => file.Name) }, - delete: true + removeSuperfile: true }) .then(() => replaceUrl("/files")) .catch(err => logger.error(err)) diff --git a/esp/src/src/ESPLogicalFile.ts b/esp/src/src/ESPLogicalFile.ts index b62354bf4e0..62f7a8a8e54 100644 --- a/esp/src/src/ESPLogicalFile.ts +++ b/esp/src/src/ESPLogicalFile.ts @@ -510,7 +510,7 @@ export function CreateDFUQueryStore(): BaseStore { return { - data: response.DFULogicalFiles.DFULogicalFile, + data: response?.DFULogicalFiles?.DFULogicalFile ?? [], total: response.NumFiles }; }); diff --git a/helm/examples/efs/delete-role.sh b/helm/examples/efs/delete-role.sh index a8d1acfb21b..7f37dacb98d 100755 --- a/helm/examples/efs/delete-role.sh +++ b/helm/examples/efs/delete-role.sh @@ -3,6 +3,7 @@ WORK_DIR=$(dirname $0) source ${WORK_DIR}/efs-env -aws iam detach-role-policy --role-name ${EKS_NAME}_EFS_CSI_Role \ - --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/AmazonEKS_EFS_CSI_Driver_Policy -aws iam delete-role --role-name ${EKS_NAME}_EFS_CSI_Role +echo "deleting iam role" +echo "make sure you also uninstall the aws-efs-csi-driver helm chart" +STACK_NAME=eksctl-${EKS_NAME}-addon-iamserviceaccount-kube-system-efs-csi-controller-sa +aws cloudformation delete-stack --stack-name ${STACK_NAME} \ No newline at end of file diff --git a/helm/examples/efs/iam-policy-example.json b/helm/examples/efs/iam-policy-example.json index 22f4c326e5c..a8f6a4f7048 100644 --- a/helm/examples/efs/iam-policy-example.json +++ b/helm/examples/efs/iam-policy-example.json @@ -23,6 +23,18 @@ } } }, + { + "Effect": "Allow", + "Action": [ + "elasticfilesystem:TagResource" + ], + "Resource": "*", + "Condition": { + "StringLike": { + "aws:ResourceTag/efs.csi.aws.com/cluster": "true" + } + } + }, { "Effect": "Allow", "Action": "elasticfilesystem:DeleteAccessPoint", diff --git a/helm/examples/efs/install-csi-driver.sh b/helm/examples/efs/install-csi-driver.sh index ed3266e0884..17b76fc7800 100755 --- a/helm/examples/efs/install-csi-driver.sh +++ b/helm/examples/efs/install-csi-driver.sh @@ -1,4 +1,5 @@ #!/bin/bash +#reference: https://docs.aws.amazon.com/eks/latest/userguide/efs-csi.html WORK_DIR=$(dirname $0) @@ -6,58 +7,36 @@ source ${WORK_DIR}/efs-env echo "AWS_PROFILE: $AWS_PROFILE" roleName=${EKS_NAME}_EFS_CSI_Role +EFS_CSI_POLICY_NAME=EKS_EFS_CSI_Driver_Policy create_efs_csi_driver_policy() { - echo "create efs csi driver policy" - #aws iam list-policies | grep -q AmazonEKS_EFS_CSI_Driver_Policy # [Errno 32] Broken pipe on WSL - aws iam list-policies | awk '/AmazonEKS_EFS_CSI_Driver_Policy/{print $2}' | grep -q EFS + echo "creating efs csi driver policy" + #aws iam list-policies | grep -q $EFS_CSI_POLICY_NAME # [Errno 32] Broken pipe on WSL + aws iam list-policies | awk "/${EFS_CSI_POLICY_NAME}/{print $2}" | grep -q EFS [ $? -ne 0 ] && \ aws iam create-policy \ - --policy-name AmazonEKS_EFS_CSI_Driver_Policy \ + --policy-name ${EFS_CSI_POLICY_NAME} \ --policy-document file://${WORK_DIR}/iam-policy-example.json } -create_iam_role() +create_iam_role_and_kubernetes_service_account() { - echo "create efs csi driver iam role" - # Delete role - ${WORK_DIR}/delete-role.sh > /dev/null 2>&1 - - #aws iam list-roles | grep -q AmazonEKS_EFS_CSI_DriverRole - aws iam list-roles | awk '/${roleName}/{print $2}' | grep -q EFS - if [ $? -ne 0 ] - then - OIDC_URL=$(aws eks describe-cluster --name ${EKS_NAME} --region ${EFS_REGION} --query "cluster.identity.oidc.issuer" --output text) - OIDC_PROVIDER=${OIDC_URL##*/} - sed "s//${ACCOUNT_ID}/g; \ - s//${EFS_REGION}/g; \ - s//${OIDC_PROVIDER}/g" ${WORK_DIR}/trust-policy.json.template > ${WORK_DIR}/trust-policy.json - echo "aws iam create-role \ - --role-name ${roleName} \ - --assume-role-policy-document file://${WORK_DIR}/trust-policy.json" - aws iam create-role \ - --role-name ${roleName} \ - --assume-role-policy-document file://"${WORK_DIR}/trust-policy.json" - aws iam attach-role-policy \ - --policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/AmazonEKS_EFS_CSI_Driver_Policy \ - --role-name ${roleName} - #rm -rf ${WORK_DIR}/trust-policy.json - fi -} - -create_efs_service_account() -{ - echo "create efs service account" - sed "s//${ACCOUNT_ID}/g; \ - s//${roleName}/g" ${WORK_DIR}/efs-service-account.yaml.template > ${WORK_DIR}/efs-service-account.yaml - kubectl apply -f ${WORK_DIR}/efs-service-account.yaml - #rm -rf ${WORK_DIR}/efs-service-account.yaml + echo "creating iam role and kubernetes service account" + eksctl utils associate-iam-oidc-provider --region=${EFS_REGION} --cluster=${EKS_NAME} --approve + eksctl create iamserviceaccount \ + --cluster ${EKS_NAME} \ + --namespace kube-system \ + --name efs-csi-controller-sa \ + --attach-policy-arn arn:aws:iam::${ACCOUNT_ID}:policy/${EFS_CSI_POLICY_NAME} \ + --approve \ + --override-existing-serviceaccounts \ + --region ${EFS_REGION} } install_amazon_efs_driver() { - echo "install then amazon efs driver" + echo "installing the amazon efs driver helm chart" helm repo add aws-efs-csi-driver https://kubernetes-sigs.github.io/aws-efs-csi-driver/ helm repo update helm upgrade -i aws-efs-csi-driver aws-efs-csi-driver/aws-efs-csi-driver \ @@ -70,7 +49,7 @@ install_amazon_efs_driver() create_storage_class_yaml() { echo "" - echo "create storageclass.yaml from storageclass.yaml.template" + echo "creating storageclass.yaml from storageclass.yaml.template" #echo "EFS_ID: ${EFS_ID} EFS_BASE_PATH: $EFS_BASE_PATH" sed "s//${EFS_ID}/g; \ s//\\${EFS_BASE_PATH}/g" ${WORK_DIR}/storageclass.yaml.template > ${WORK_DIR}/storageclass.yaml @@ -80,8 +59,7 @@ helm list | grep -q ${EFS_NAME} if [[ $? -ne 0 ]] then create_efs_csi_driver_policy - create_iam_role - create_efs_service_account + create_iam_role_and_kubernetes_service_account ${WORK_DIR}/associate-oidc.sh install_amazon_efs_driver create_storage_class_yaml diff --git a/helm/hpcc/templates/eclagent.yaml b/helm/hpcc/templates/eclagent.yaml index 1d915719861..245e7554577 100644 --- a/helm/hpcc/templates/eclagent.yaml +++ b/helm/hpcc/templates/eclagent.yaml @@ -85,10 +85,8 @@ data: {{- include "hpcc.addResources" (dict "me" .me.resources) | indent 12 }} {{- $appCmd := printf "%s %s %s _HPCC_ARGS_" $apptype (include "hpcc.configArg" .me) (include "hpcc.daliArg" (dict "root" .root "component" "ECL Agent" "optional" false )) }} {{ include "hpcc.addCommandAndLifecycle" (. | merge (dict "command" $appCmd)) | indent 12 }} -{{- if .env }} env: {{ include "hpcc.mergeEnvironments" .env | indent 12 }} -{{- end }} workingDir: /var/lib/HPCCSystems volumeMounts: {{ include "hpcc.addConfigMapVolumeMount" .me | indent 12 }} diff --git a/helm/hpcc/templates/thor.yaml b/helm/hpcc/templates/thor.yaml index 009eca0fb58..5eafff196c0 100644 --- a/helm/hpcc/templates/thor.yaml +++ b/helm/hpcc/templates/thor.yaml @@ -110,10 +110,8 @@ data: {{- include "hpcc.addResources" (dict "me" .eclAgentResources) | indent 12 }} {{- $agentCmd := printf "%s %s %s _HPCC_ARGS_" $eclAgentType (include "hpcc.configArg" .me) (include "hpcc.daliArg" (dict "root" .root "component" "Thor" "optional" false)) }} {{ include "hpcc.addCommandAndLifecycle" (. | merge (dict "command" $agentCmd)) | indent 12 }} -{{- if .env }} env: {{ include "hpcc.mergeEnvironments" .env | indent 12 }} -{{- end }} workingDir: /var/lib/HPCCSystems volumeMounts: {{ include "hpcc.addConfigMapVolumeMount" .me | indent 12 }} @@ -179,10 +177,8 @@ data: {{- include "hpcc.addResources" (dict "me" $thorScope.managerResources) | indent 12 }} {{- $thorManagerCmd := printf "thormaster_lcr %s %s _HPCC_ARGS_" (include "hpcc.configArg" .me) (include "hpcc.daliArg" (dict "root" .root "component" "Thor" "optional" false)) }} {{ include "hpcc.addCommandAndLifecycle" (. | merge (dict "command" $thorManagerCmd)) | indent 12 }} -{{- if .env }} env: {{ include "hpcc.mergeEnvironments" .env | indent 12 }} -{{- end }} workingDir: /var/lib/HPCCSystems volumeMounts: {{ include "hpcc.addConfigMapVolumeMount" .me | indent 12 }} @@ -250,10 +246,8 @@ data: {{- include "hpcc.addResources" (dict "me" $thorScope.workerResources) | indent 12 }} {{- $thorWorkerCmd := printf "thorslave_lcr %s %s _HPCC_ARGS_ --slaveport=%d" (include "hpcc.configArg" $configCtx.me) (include "hpcc.daliArg" (dict "root" $configCtx.root "component" "Thor" "optional" false)) $slavePort }} {{ include "hpcc.addCommandAndLifecycle" ($configCtx | merge (dict "command" $thorWorkerCmd)) | indent 12 }} -{{- if $configCtx.me.env }} env: {{ include "hpcc.mergeEnvironments" $configCtx.me.env | indent 12 }} -{{- end }} workingDir: /var/lib/HPCCSystems volumeMounts: {{ include "hpcc.addConfigMapVolumeMount" $configCtx.me | indent 12 }} diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index b2d4cf66f36..015d5981bba 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -934,7 +934,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) } acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); - headRegionSize = topology->getPropInt("@headRegionSize", 50); + headRegionSize = topology->getPropInt("@headRegionSize", 0); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0)); diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 147005f7a80..2f860f137e2 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -12439,6 +12439,8 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity, offset_t offsetBranches = 0; offset_t uncompressedSize = 0; offset_t originalBlobSize = 0; + offset_t branchMemorySize = 0; + offset_t leafMemorySize = 0; unsigned nodeSize = 0; void updateWorkUnitResult() @@ -12650,6 +12652,8 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity, numBlobNodes = builder->getNumBlobNodes(); offsetBranches = builder->getOffsetBranches(); originalBlobSize = bc.queryTotalSize(); + branchMemorySize = builder->getBranchMemorySize(); + leafMemorySize = builder->getLeafMemorySize(); noteStatistic(StNumLeafCacheAdds, numLeafNodes); noteStatistic(StNumNodeCacheAdds, numBranchNodes); @@ -12739,6 +12743,10 @@ class CRoxieServerIndexWriteActivity : public CRoxieServerInternalSinkActivity, properties.setPropInt64("@numBlobNodes", numBlobNodes); if (numBlobNodes) properties.setPropInt64("@originalBlobSize", originalBlobSize); + if (branchMemorySize) + properties.setPropInt64("@branchMemorySize", branchMemorySize); + if (leafMemorySize) + properties.setPropInt64("@leafMemorySize", leafMemorySize); size32_t keyedSize = helper.getKeyedSize(); if (keyedSize == (size32_t)-1) @@ -25859,7 +25867,7 @@ class KeyedJoinRemoteAdaptor : public CRemoteResultAdaptor // MORE - not sure it ActivityTimer t(activityStats, timeActivities); for (;;) { - if (eof) + if (unlikely(eof)) return NULL; processAgentResults(); if (ready.ordinality()) diff --git a/roxie/udplib/udptrr.cpp b/roxie/udplib/udptrr.cpp index a729839ef09..fa05ca1131d 100644 --- a/roxie/udplib/udptrr.cpp +++ b/roxie/udplib/udptrr.cpp @@ -1329,14 +1329,12 @@ class CReceiveManager : implements IReceiveManager, public CInterface unsigned lastPacketsOOO = 0; unsigned lastUnwantedDiscarded = 0; unsigned timeout = 5000; - DataBuffer *b = nullptr; + roxiemem::IDataBufferManager * udpBufferManager = bufferManager; + DataBuffer *b = udpBufferManager->allocate(); while (running) { try { - if (!b) - b = bufferManager->allocate(); - unsigned int res; while (true) { @@ -1374,7 +1372,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface } } parent.input_queue->pushOwn(b); - b = nullptr; + b = udpBufferManager->allocate(); if (udpStatsReportInterval) { diff --git a/system/jhtree/ctfile.hpp b/system/jhtree/ctfile.hpp index 701d053b4a9..2d29649baaa 100644 --- a/system/jhtree/ctfile.hpp +++ b/system/jhtree/ctfile.hpp @@ -512,6 +512,8 @@ interface IIndexCompressor : public IInterface { virtual const char *queryName() const = 0; virtual CWriteNode *createNode(offset_t _fpos, CKeyHdr *_keyHdr, bool isLeafNode) const = 0; + virtual offset_t queryBranchMemorySize() const = 0; + virtual offset_t queryLeafMemorySize() const = 0; }; diff --git a/system/jhtree/jhinplace.cpp b/system/jhtree/jhinplace.cpp index 5028fc014e4..f35a89a87db 100644 --- a/system/jhtree/jhinplace.cpp +++ b/system/jhtree/jhinplace.cpp @@ -2129,6 +2129,7 @@ void CInplaceBranchWriteNode::write(IFileIOStream *out, CRC32 *crc) assertex(inplaceSize == writtenSize); ctx.totalDataSize += data.length(); + ctx.branchMemorySize += data.length(); assertex(data.length() == getDataSize()); } @@ -2327,6 +2328,7 @@ bool CInplaceLeafWriteNode::add(offset_t pos, const void * _data, size32_t size, saveLastKey(data, size, sequence); hdr.numKeys++; + totalUncompressedSize += size; return true; } @@ -2480,12 +2482,18 @@ void CInplaceLeafWriteNode::write(IFileIOStream *out, CRC32 *crc) serializePacked(data, trailingSize); data.append(trailingSize, uncompressed.bytes() + firstUncompressed); } + if (payloadCompression != COMPRESS_METHOD_RANDROW) + { + //Calculate the size of the payload when expanded, currently the compressed payload is kept in memory, so do not subtract that + ctx.leafMemorySize += (totalUncompressedSize - (keyCompareLen * hdr.numKeys)); + } break; } } } ctx.totalDataSize += data.length(); + ctx.leafMemorySize += data.length(); assertex(data.length() == getDataSize(true)); } diff --git a/system/jhtree/jhinplace.hpp b/system/jhtree/jhinplace.hpp index 6f95918629d..7082ed48aa8 100644 --- a/system/jhtree/jhinplace.hpp +++ b/system/jhtree/jhinplace.hpp @@ -171,6 +171,8 @@ class jhtree_decl InplaceKeyBuildContext offset_t totalDataSize = 0; offset_t numLeafNodes = 0; offset_t numBlockCompresses = 0; + offset_t branchMemorySize = 0; + offset_t leafMemorySize = 0; struct { double minCompressionThreshold = 0.95; // use uncompressed if compressed is > 95% uncompressed unsigned maxCompressionFactor = 25; // Don't compress payload to less than 4% of the original by default (beause when it is read it will use lots of memory) @@ -296,6 +298,7 @@ class jhtree_decl CInplaceLeafWriteNode : public CInplaceWriteNode size32_t keyLen = 0; size32_t firstUncompressed = 0; size32_t sizeCompressedPayload = 0; // Set from closed compressor + offset_t totalUncompressedSize = 0; bool isVariable = false; bool rowCompression = false; bool useCompressedPayload = false; @@ -317,6 +320,15 @@ class InplaceIndexCompressor : public CInterfaceOf return new CInplaceBranchWriteNode(_fpos, _keyHdr, ctx); } + virtual offset_t queryBranchMemorySize() const override + { + return ctx.branchMemorySize; + } + virtual offset_t queryLeafMemorySize() const override + { + return ctx.leafMemorySize; + } + protected: StringAttr compressionName; mutable InplaceKeyBuildContext ctx; diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 79ce977dd51..085ef184497 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -1271,7 +1271,7 @@ CJHTreeNode *CKeyIndex::_loadNode(char *nodeData, offset_t pos, bool needsCopy) bool CKeyIndex::isTopLevelKey() const { - return (keyHdr->getKeyType() & HTREE_TOPLEVEL_KEY) != 0; + return isTLK(); } bool CKeyIndex::isFullySorted() @@ -1283,6 +1283,7 @@ __uint64 CKeyIndex::getPartitionFieldMask() { return keyHdr->getPartitionFieldMask(); } + unsigned CKeyIndex::numPartitions() { return keyHdr->numPartitions(); @@ -1297,9 +1298,8 @@ IKeyCursor *CKeyIndex::getCursor(const IIndexFilterList *filter, bool logExcessi const CJHSearchNode *CKeyIndex::getNode(offset_t offset, NodeType type, IContextLogger *ctx) const { latestGetNodeOffset = offset; - const CJHTreeNode *node = cache->getNode(this, iD, offset, type, ctx, isTopLevelKey()); - assertex(!node || type == node->getNodeType()); - return (const CJHSearchNode *) node; + //Call isTLK() rather than isTopLevelKey() so the test is inlined (rather than a virtual) + return (CJHSearchNode *)cache->getNode(this, iD, offset, type, ctx, isTLK()); } void CKeyIndex::dumpNode(FILE *out, offset_t pos, unsigned count, bool isRaw) @@ -2611,18 +2611,20 @@ const CJHTreeNode *CNodeCache::getNode(const INodeLoader *keyIndex, unsigned iD, // Lock, add if missing, unlock. Lock a page-dependent-cr load() release lock. //There will be the same number of critical section locks, but loading a page will contend on a different lock - so it should reduce contention. CKeyIdAndPos key(iD, pos); + CNodeMRUCache & curCache = cache[cacheType]; CriticalSection & cacheLock = lock[cacheType]; Owned ownedCacheEntry; // ensure node gets cleaned up if it fails to load bool alreadyExists = true; { CNodeCacheEntry * cacheEntry; + unsigned hashcode = curCache.getKeyHash(key); CriticalBlock block(cacheLock); - cacheEntry = cache[cacheType].query(key); + cacheEntry = curCache.query(hashcode, &key); if (unlikely(!cacheEntry)) { cacheEntry = new CNodeCacheEntry; - cache[cacheType].replace(key, *cacheEntry); + curCache.replace(key, *cacheEntry); alreadyExists = false; } @@ -2668,9 +2670,10 @@ const CJHTreeNode *CNodeCache::getNode(const INodeLoader *keyIndex, unsigned iD, if (!ownedCacheEntry->isReady()) { const CJHTreeNode *node = keyIndex->loadNode(&fetchCycles, pos); + assertex(type == node->getNodeType()); //Update the associated size of the entry in the hash table before setting isReady (never evicted until isReady is set) - cache[cacheType].noteReady(*node); + curCache.noteReady(*node); ownedCacheEntry->noteReady(node); } else @@ -2730,7 +2733,7 @@ const CJHTreeNode *CNodeCache::getNode(const INodeLoader *keyIndex, unsigned iD, { CriticalBlock block(cacheLock); if (!ownedCacheEntry->isReady()) - cache[cacheType].remove(key); + curCache.remove(key); } throw; } diff --git a/system/jhtree/jhtree.ipp b/system/jhtree/jhtree.ipp index 0a0ca4e5773..83cf744fd0a 100644 --- a/system/jhtree/jhtree.ipp +++ b/system/jhtree/jhtree.ipp @@ -114,6 +114,8 @@ protected: void init(KeyHdr &hdr, bool isTLK); void loadBloomFilters(); const CJHSearchNode *getRootNode() const; + + inline bool isTLK() const { return (keyHdr->getKeyType() & HTREE_TOPLEVEL_KEY) != 0; } public: IMPLEMENT_IINTERFACE; @@ -125,7 +127,7 @@ public: virtual size32_t keySize(); virtual bool hasPayload(); virtual size32_t keyedSize(); - virtual bool isTopLevelKey() const override; + virtual bool isTopLevelKey() const override final; virtual bool isFullySorted() override; virtual __uint64 getPartitionFieldMask() override; virtual unsigned numPartitions() override; diff --git a/system/jhtree/jhutil.hpp b/system/jhtree/jhutil.hpp index 2062b741915..08b0adaa1ff 100644 --- a/system/jhtree/jhutil.hpp +++ b/system/jhtree/jhutil.hpp @@ -63,6 +63,10 @@ class CMRUCacheOf : public CInterface//, public IInterface table.replace(*mapping); mruList.enqueueHead(mapping); } + unsigned getKeyHash(KEY & key) const + { + return table.getHashFromFindParam(&key); + } ENTRY *query(KEY key, bool doPromote=true) { MAPPING *mapping = table.find(key); @@ -72,6 +76,15 @@ class CMRUCacheOf : public CInterface//, public IInterface promote(mapping); return &mapping->query(); // MAPPING must impl. query() } + ENTRY *query(unsigned hashcode, KEY * key, bool doPromote=true) + { + MAPPING *mapping = table.find(hashcode, *key); + if (!mapping) return NULL; + + if (doPromote) + promote(mapping); + return &mapping->query(); // MAPPING must impl. query() + } ENTRY *get(KEY key, bool doPromote=true) { return LINK(query(key, doPromote)); @@ -96,11 +109,7 @@ class CMRUCacheOf : public CInterface//, public IInterface void kill() { clear(-1); } void promote(MAPPING *mapping) { - if (mruList.head() != mapping) - { - mruList.dequeue(mapping); // will still be linked in table - mruList.enqueueHead(mapping); - } + mruList.moveToHead(mapping); } CMRUIterator *getIterator() { diff --git a/system/jhtree/keybuild.cpp b/system/jhtree/keybuild.cpp index e894a24771a..acdb0f0aee2 100644 --- a/system/jhtree/keybuild.cpp +++ b/system/jhtree/keybuild.cpp @@ -85,6 +85,14 @@ class PocIndexCompressor : public CInterfaceOf else return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode); } + virtual offset_t queryBranchMemorySize() const override + { + return 0; + } + virtual offset_t queryLeafMemorySize() const override + { + return 0; + } }; class LegacyIndexCompressor : public CInterfaceOf @@ -94,6 +102,14 @@ class LegacyIndexCompressor : public CInterfaceOf { return new CLegacyWriteNode(_fpos, _keyHdr, isLeafNode); } + virtual offset_t queryBranchMemorySize() const override + { + return 0; // same as default calculation + } + virtual offset_t queryLeafMemorySize() const override + { + return 0; // MORE: Update for in-place row compression + } }; class CKeyBuilder : public CInterfaceOf @@ -601,6 +617,8 @@ class CKeyBuilder : public CInterfaceOf virtual unsigned __int64 getNumBranchNodes() const override { return numBranches; } virtual unsigned __int64 getNumBlobNodes() const override { return numBlobs; } virtual unsigned __int64 getOffsetBranches() const override { return offsetBranches; } + virtual unsigned __int64 getBranchMemorySize() const override { return indexCompressor->queryBranchMemorySize(); } + virtual unsigned __int64 getLeafMemorySize() const override { return indexCompressor->queryLeafMemorySize(); } protected: void writeMetadata(char const * data, size32_t size) diff --git a/system/jhtree/keybuild.hpp b/system/jhtree/keybuild.hpp index 727f27b2ad7..d5d8c22a389 100644 --- a/system/jhtree/keybuild.hpp +++ b/system/jhtree/keybuild.hpp @@ -104,6 +104,8 @@ interface IKeyBuilder : public IInterface virtual unsigned __int64 getNumBranchNodes() const = 0; virtual unsigned __int64 getNumBlobNodes() const = 0; virtual unsigned __int64 getOffsetBranches() const = 0; + virtual unsigned __int64 getBranchMemorySize() const = 0; + virtual unsigned __int64 getLeafMemorySize() const = 0; }; extern jhtree_decl IKeyBuilder *createKeyBuilder(IFileIOStream *_out, unsigned flags, unsigned rawSize, unsigned nodeSize, unsigned keyFieldSize, unsigned __int64 startSequence, IHThorIndexWriteArg *helper, const char * defaultCompression, bool enforceOrder, bool isTLK); diff --git a/system/jlib/jfcmp.hpp b/system/jlib/jfcmp.hpp index 3e5b96ce618..f3d407452f3 100644 --- a/system/jlib/jfcmp.hpp +++ b/system/jlib/jfcmp.hpp @@ -156,7 +156,10 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf { if (trailing) return written; - flushcommitted(); + + if (inlen == inmax) + flushcommitted(); + if (lenb+inlen>inmax) { if (outBufMb) // sizing input buffer, but outBufMb!=NULL is condition of whether in use or not @@ -211,7 +214,7 @@ class jlib_decl CFcmpCompressor : public CSimpleInterfaceOf }; -class jlib_decl CFcmpExpander : public CSimpleInterfaceOf +class jlib_decl CFcmpExpander : public CExpanderBase { protected: byte *outbuf; @@ -241,50 +244,6 @@ class jlib_decl CFcmpExpander : public CSimpleInterfaceOf return outlen; } - virtual void expand(void *buf) - { - if (!outlen) - return; - if (buf) - { - if (bufalloc) - free(outbuf); - bufalloc = 0; - outbuf = (unsigned char *)buf; - } - else if (outlen>bufalloc) - { - if (bufalloc) - free(outbuf); - bufalloc = outlen; - outbuf = (unsigned char *)malloc(bufalloc); - if (!outbuf) - throw MakeStringException(MSGAUD_operator,0, "Out of memory in FcmpExpander::expand, requesting %d bytes", bufalloc); - } - size32_t done = 0; - for (;;) - { - const size32_t szchunk = *in; - in++; - if (szchunk+doneoutlen)) - throw MakeStringException(0, "FcmpExpander - corrupt data(1) %d %d",written,szchunk); - } - else - { - if (szchunk+done!=outlen) - throw MakeStringException(0, "FcmpExpander - corrupt data(2) %d %d",szchunk,outlen); - memcpy((byte *)buf+done,in,szchunk); - break; - } - in = (const size32_t *)(((const byte *)in)+szchunk); - } - } - virtual void *bufptr() { return outbuf;} virtual size32_t buflen() { return outlen;} }; diff --git a/system/jlib/jflz.cpp b/system/jlib/jflz.cpp index a809964c540..078229722ba 100644 --- a/system/jlib/jflz.cpp +++ b/system/jlib/jflz.cpp @@ -687,7 +687,7 @@ class CFastLZCompressor final : public CFcmpCompressor class jlib_decl CFastLZExpander : public CFcmpExpander { public: - virtual void expand(void *buf) + virtual void expand(void *buf) override { if (!outlen) return; diff --git a/system/jlib/jlz4.cpp b/system/jlib/jlz4.cpp index 92f3a8134d6..3b684a3bd60 100644 --- a/system/jlib/jlz4.cpp +++ b/system/jlib/jlz4.cpp @@ -177,8 +177,9 @@ class CLZ4Compressor final : public CFcmpCompressor class jlib_decl CLZ4Expander : public CFcmpExpander { + size32_t totalExpanded = 0; public: - virtual void expand(void *buf) + virtual void expand(void *buf) override { if (!outlen) return; @@ -221,6 +222,66 @@ class jlib_decl CLZ4Expander : public CFcmpExpander } } + virtual size32_t expandFirst(MemoryBuffer & target, const void * src) override + { + init(src); + totalExpanded = 0; + return expandNext(target); + } + + virtual size32_t expandNext(MemoryBuffer & target) override + { + if (totalExpanded == outlen) + return 0; + + const size32_t szchunk = *in; + in++; + + target.clear(); + size32_t written; + if (szchunk+totalExpanded maxEstimate) + estimate = maxEstimate; + if (maxOut < estimate) + maxOut = estimate; + + for (;;) + { + //Try and compress into the current target buffer. If too small increase size and repeat + written = LZ4_decompress_safe((const char *)in, (char *)target.reserve(maxOut), szchunk, maxOut); + if ((int)written > 0) + { + target.setLength(written); + break; + } + + //Sanity check to catch corrupt lz4 data that always returns an error. + if (maxOut > outlen) + throwUnexpected(); + + maxOut += szchunk; // Likely to quickly approach the actual expanded size + target.clear(); + } + } + else + { + void * buf = target.reserve(szchunk); + written = szchunk; + memcpy(buf,in,szchunk); + } + + in = (const size32_t *)(((const byte *)in)+szchunk); + totalExpanded += written; + if (totalExpanded > outlen) + throw MakeStringException(0, "LZ4Expander - corrupt data(3) %d %d",written,szchunk); + return written; + } }; void LZ4CompressToBuffer(MemoryBuffer & out, size32_t len, const void * src) diff --git a/system/jlib/jlzw.cpp b/system/jlib/jlzw.cpp index 8e343cecb6d..46604389dcf 100644 --- a/system/jlib/jlzw.cpp +++ b/system/jlib/jlzw.cpp @@ -448,6 +448,20 @@ void CLZWCompressor::close() } } + +size32_t CExpanderBase::expandFirst(MemoryBuffer & target, const void * src) +{ + size32_t size = init(src); + void * buffer = target.reserve(size); + expand(buffer); + return size; +} + +size32_t CExpanderBase::expandNext(MemoryBuffer & target) +{ + return 0; +} + CLZWExpander::CLZWExpander(bool _supportbigendian) { outbuf = NULL; @@ -1467,7 +1481,7 @@ class jlib_decl CRDiffCompressor : public ICompressor, public CInterface }; -class jlib_decl CRDiffExpander : public IExpander, public CInterface +class jlib_decl CRDiffExpander : public CExpanderBase { unsigned char *outbuf; size32_t outlen; @@ -1475,8 +1489,6 @@ class jlib_decl CRDiffExpander : public IExpander, public CInterface unsigned char *in; size32_t recsize; public: - IMPLEMENT_IINTERFACE; - CRDiffExpander() { outbuf = NULL; @@ -1987,8 +1999,12 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface bool writeException; Owned compressor; Owned expander; + MemoryAttr compressedInputBlock; unsigned compMethod; offset_t lastFlushPos = (offset_t)-1; + offset_t nextExpansionPos = (offset_t)-1; + offset_t startBlockPos = (offset_t)-1; + size32_t fullBlockSize = 0; unsigned indexNum() { return indexbuf.length()/sizeof(offset_t); } @@ -2017,6 +2033,43 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface void getblock(offset_t pos) { curblockbuf.clear(); + + //If the blocks are being expanded incrementally check if the position is within the current block + //This test will never be true for row compressed data, or non-incremental decompression + if ((pos >= startBlockPos) && (pos < startBlockPos + fullBlockSize)) + { + if (pos < nextExpansionPos) + { + //Start decompressing again and avoid re-reading the data from disk + const void * rawData; + if (fileio) + rawData = compressedInputBlock.get(); + else + rawData = mmfile->base()+startBlockPos; + + assertex(rawData); + size32_t exp = expander->expandFirst(curblockbuf, rawData); + curblockpos = startBlockPos; + nextExpansionPos = startBlockPos + exp; + if (pos < nextExpansionPos) + return; + + curblockbuf.clear(); + } + + for (;;) + { + size32_t nextSize = expander->expandNext(curblockbuf); + if (nextSize == 0) + throwUnexpected(); // Should have failed the outer block test if nextSize is 0 + + curblockpos = nextExpansionPos; + nextExpansionPos = nextExpansionPos+nextSize; + if (pos < nextExpansionPos) + return; + } + } + size32_t expsize; curblocknum = lookupIndex(pos,curblockpos,expsize); size32_t toread = trailer.blockSize; @@ -2027,8 +2080,9 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface if (!toread) return; if (fileio) { - MemoryAttr comp; - void *b=comp.allocate(toread); + //Allocate on the first call, reuse on subsequent calls. + void * b = compressedInputBlock.allocate(trailer.blockSize); + size32_t r = fileio->read(p,toread,b); assertex(r==toread); expand(b,curblockbuf,expsize); @@ -2070,11 +2124,10 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface } else { // lzw or fastlz or lz4 assertex(expander.get()); - size32_t exp = expander->init(compbuf); - if (exp!=expsize) { - throw MakeStringException(-1,"Compressed file format failure(%d,%d) - Encrypted?",exp,expsize); - } - expander->expand(expbuf.reserve(exp)); + size32_t exp = expander->expandFirst(expbuf, compbuf); + startBlockPos = curblockpos; + nextExpansionPos = startBlockPos + exp; + fullBlockSize = expsize; } } @@ -2224,6 +2277,9 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface compMethod = COMPRESS_METHOD_LZW; expander.setown(createLZWExpander(true)); } + //Preallocate the expansion target to the block size - to ensure it is the right size and + //avoid reallocation when expanding lz4 + curblockbuf.ensureCapacity(trailer.blockSize); } } } @@ -2685,13 +2741,12 @@ class CAESCompressor : implements ICompressor, public CInterface virtual CompressionMethod getCompressionMethod() const override { return (CompressionMethod)(COMPRESS_METHOD_AES | comp->getCompressionMethod()); } }; -class CAESExpander : implements IExpander, public CInterface +class CAESExpander : implements CExpanderBase { Owned exp; // base expander MemoryBuffer compbuf; MemoryAttr key; public: - IMPLEMENT_IINTERFACE; CAESExpander(const void *_key, unsigned _keylen) : key(_keylen,_key) { diff --git a/system/jlib/jlzw.hpp b/system/jlib/jlzw.hpp index 72816bc4370..1d7c7196390 100644 --- a/system/jlib/jlzw.hpp +++ b/system/jlib/jlzw.hpp @@ -66,6 +66,8 @@ interface jlib_decl IExpander : public IInterface virtual void expand(void *target)=0; virtual void * bufptr()=0; virtual size32_t buflen()=0; + virtual size32_t expandFirst(MemoryBuffer & target, const void * src) = 0; + virtual size32_t expandNext(MemoryBuffer & target) = 0; }; @@ -82,6 +84,13 @@ interface jlib_decl IRandRowExpander : public IInterface }; +class jlib_decl CExpanderBase : public CInterfaceOf +{ +public: + //Provide default implementations + virtual size32_t expandFirst(MemoryBuffer & target, const void * src) override; + virtual size32_t expandNext(MemoryBuffer & target) override; +}; extern jlib_decl ICompressor *createLZWCompressor(bool supportbigendian=false); // bigendiansupport required for cross platform with solaris diff --git a/system/jlib/jlzw.ipp b/system/jlib/jlzw.ipp index 0e4ce4fcf7a..1f562b7d6cf 100644 --- a/system/jlib/jlzw.ipp +++ b/system/jlib/jlzw.ipp @@ -38,7 +38,6 @@ public: unsigned char dictchar[LZW_HASH_TABLE_SIZE]; }; - class CLZWCompressor final : public ICompressor, public CInterface { public: @@ -88,12 +87,9 @@ protected: bool supportbigendian; }; - -class jlib_decl CLZWExpander : public IExpander, public CInterface +class CLZWExpander : public CExpanderBase { public: - IMPLEMENT_IINTERFACE; - CLZWExpander(bool _supportbigendian); ~CLZWExpander(); virtual size32_t init(const void *blk); // returns size required diff --git a/system/jlib/jptree-attrs.hpp b/system/jlib/jptree-attrs.hpp index c841f090ac4..ac52f6e859f 100644 --- a/system/jlib/jptree-attrs.hpp +++ b/system/jlib/jptree-attrs.hpp @@ -1,6 +1,7 @@ "@accessed", "@activity", "@agentSession", + "@branchMemorySize" "@buildVersion", "@checkSum", "@cloneable", @@ -38,6 +39,7 @@ "@isScalar", "@jobName", "@keyedSize", + "@leafMemorySize" "@libCount", "@localValue", "@mapFlags", diff --git a/system/jlib/jqueue.hpp b/system/jlib/jqueue.hpp index aeba567c46e..f00bd4a666d 100644 --- a/system/jlib/jqueue.hpp +++ b/system/jlib/jqueue.hpp @@ -447,6 +447,28 @@ class DListOf } numEntries++; } + void moveToHead(ELEMENT * element) + { + if (likely(pHead != element)) + { + //Initial code from remove() - simplified since pHead != element, and no decrement of entries + ELEMENT * next = element->next; + ELEMENT * prev = element->prev; + assertex(prev || next); + if (element == pTail) // would if (!next) avoid loading pTail? + pTail = prev; + if (next) + next->prev = prev; + if (prev) + prev->next = next; + + //enqueueHead() - simplified since pHead must be set, and no increment of number of entries + pHead->prev = element; + element->next = pHead; + element->prev = nullptr; + pHead = element; + } + } ELEMENT *head() const { return pHead; } ELEMENT *tail() const { return pTail; } void remove(ELEMENT *element) diff --git a/system/jlib/jqueue.tpp b/system/jlib/jqueue.tpp index 62494057e91..636d6d152fa 100644 --- a/system/jlib/jqueue.tpp +++ b/system/jlib/jqueue.tpp @@ -33,7 +33,7 @@ class QueueOf unsigned headp; unsigned tailp; unsigned max; - unsigned num; + RelaxedAtomic num; // atomic so that it can be read without a critical section void expand() { unsigned inc; @@ -87,7 +87,7 @@ public: tailp=0; } ptrs[tailp] = e; - num++; + num.fastAdd(1); // Do not use increment which is atomic } } void enqueueHead(BASE *e) @@ -105,7 +105,7 @@ public: headp--; } ptrs[headp] = e; - num++; + num.fastAdd(1); // Do not use increment which is atomic } } void enqueue(BASE *e,unsigned i) @@ -133,7 +133,7 @@ public: p = n; } while (p!=i); ptrs[i] = e; - num++; + num.fastAdd(1); // Do not use increment which is atomic } } } @@ -159,7 +159,7 @@ public: headp++; if (headp==max) headp = 0; - num--; + num.fastAdd(-1); // Do not use decrement which is atomic return ret; } BASE *dequeueTail() @@ -170,7 +170,7 @@ public: if (tailp==0) tailp=max; tailp--; - num--; + num.fastAdd(-1); // Do not use decrement which is atomic return ret; } BASE *dequeue(unsigned i) @@ -196,7 +196,7 @@ public: headp++; if (headp==max) headp = 0; - num--; + num.fastAdd(-1); // Do not use decrement which is atomic return ret; } void set(unsigned idx, BASE *v) @@ -270,7 +270,7 @@ public: BASE *dequeue(unsigned i) { CriticalBlock b(crit); return QueueOf::dequeue(i); } unsigned find(BASE *e) { CriticalBlock b(crit); return QueueOf::find(e); } void dequeue(BASE *e) { CriticalBlock b(crit); return QueueOf::dequeue(e); } - inline unsigned ordinality() const { CriticalBlock b(crit); return QueueOf::ordinality(); } + inline unsigned ordinality() const { return QueueOf::ordinality(); } void set(unsigned idx, BASE *e) { CriticalBlock b(crit); return QueueOf::set(idx, e); } }; diff --git a/system/jlib/jsuperhash.hpp b/system/jlib/jsuperhash.hpp index e8da89501b7..b9c209f6760 100644 --- a/system/jlib/jsuperhash.hpp +++ b/system/jlib/jsuperhash.hpp @@ -162,11 +162,13 @@ class SuperHashTableOf : public SuperHashTable }; -// Macro to provide find method taking reference instead of pointer +// Macro to provide find method taking reference instead of pointer (yuk!) #define IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(ET, FP) \ inline ET * find(FP & fp) const \ - { return SuperHashTableOf::find(&fp); } + { return SuperHashTableOf::find(&fp); } \ + inline ET * find(unsigned hash, FP & fp) const \ + { return SuperHashTableOf::find(hash, &fp); } // simple type hashing HT impl. diff --git a/thorlcr/activities/indexwrite/thindexwrite.cpp b/thorlcr/activities/indexwrite/thindexwrite.cpp index ceeefbffcc7..859a987041f 100644 --- a/thorlcr/activities/indexwrite/thindexwrite.cpp +++ b/thorlcr/activities/indexwrite/thindexwrite.cpp @@ -38,6 +38,8 @@ class IndexWriteActivityMaster : public CMasterActivity offset_t compressedFileSize = 0; offset_t uncompressedSize = 0; offset_t originalBlobSize = 0; + offset_t branchMemorySize = 0; + offset_t leafMemorySize = 0; Owned fileDesc; bool buildTlk, isLocal, singlePartKey; StringArray clusters; @@ -265,6 +267,10 @@ class IndexWriteActivityMaster : public CMasterActivity props.setPropInt64("@numBlobNodes", numBlobNodes); if (numBlobNodes) props.setPropInt64("@originalBlobSize", originalBlobSize); + if (branchMemorySize) + props.setPropInt64("@branchMemorySize", branchMemorySize); + if (leafMemorySize) + props.setPropInt64("@leafMemorySize", leafMemorySize); Owned metadata; buildUserMetadata(metadata, *helper); @@ -345,12 +351,16 @@ class IndexWriteActivityMaster : public CMasterActivity offset_t slaveOffsetBranches; offset_t slaveUncompressedSize; offset_t slaveOriginalBlobSize; + offset_t slaveBranchMemorySize; + offset_t slaveLeafMemorySize; mb.read(slaveNumLeafNodes); mb.read(slaveNumBlobNodes); mb.read(slaveNumBranchNodes); mb.read(slaveOffsetBranches); mb.read(slaveUncompressedSize); mb.read(slaveOriginalBlobSize); + mb.read(slaveBranchMemorySize); + mb.read(slaveLeafMemorySize); compressedFileSize += size; numLeafNodes += slaveNumLeafNodes; @@ -358,6 +368,8 @@ class IndexWriteActivityMaster : public CMasterActivity numBranchNodes += slaveNumBranchNodes; uncompressedSize += slaveUncompressedSize; originalBlobSize += slaveOriginalBlobSize; + branchMemorySize += slaveBranchMemorySize; + leafMemorySize += slaveLeafMemorySize; props.setPropInt64("@uncompressedSize", slaveUncompressedSize); props.setPropInt64("@offsetBranches", slaveOffsetBranches); diff --git a/thorlcr/activities/indexwrite/thindexwriteslave.cpp b/thorlcr/activities/indexwrite/thindexwriteslave.cpp index c827663568c..8b09d756023 100644 --- a/thorlcr/activities/indexwrite/thindexwriteslave.cpp +++ b/thorlcr/activities/indexwrite/thindexwriteslave.cpp @@ -60,6 +60,8 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt offset_t offsetBranches = 0; offset_t uncompressedSize = 0; offset_t originalBlobSize = 0; + offset_t branchMemorySize = 0; + offset_t leafMemorySize = 0; MemoryBuffer rowBuff; OwnedConstThorRow lastRow, firstRow; @@ -241,6 +243,8 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt numBranchNodes = builder->getNumBranchNodes(); numBlobNodes = builder->getNumBlobNodes(); offsetBranches = builder->getOffsetBranches(); + branchMemorySize = builder->getBranchMemorySize(); + leafMemorySize = builder->getLeafMemorySize(); } } } @@ -633,6 +637,8 @@ class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadSt mb.append(offsetBranches); mb.append(uncompressedSize); mb.append(originalBlobSize); + mb.append(branchMemorySize); + mb.append(leafMemorySize); if (!singlePartKey && firstNode() && buildTlk) { diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index c1d75a6cde3..9d52f3bbd1d 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1098,7 +1098,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - + updateSpillSize(wu, graphScope, SSTgraph); removeJob(*job); } catch (IException *e) diff --git a/thorlcr/msort/tsorts.cpp b/thorlcr/msort/tsorts.cpp index 09164448530..41d8b798002 100644 --- a/thorlcr/msort/tsorts.cpp +++ b/thorlcr/msort/tsorts.cpp @@ -33,6 +33,7 @@ #include "tsortm.hpp" #include "tsortmp.hpp" #include "thbuf.hpp" +#include "thbufdef.hpp" #include "thgraph.hpp" #ifdef _DEBUG @@ -198,6 +199,7 @@ class CWriteIntercept : public CSimpleInterface dataFile.setown(createIFile(tempname.str())); unsigned rwFlags = DEFAULT_RWFLAGS; + size32_t compBlkSz = 0; if (activity.getOptBool(THOROPT_COMPRESS_SPILLS, true) && activity.getOptBool(THOROPT_COMPRESS_SORTOVERFLOW, true)) { StringBuffer compType; @@ -209,11 +211,12 @@ class CWriteIntercept : public CSimpleInterface rwFlags |= rw_compress; rwFlags |= spillCompInfo; compressedOverflowFile = true; - ActPrintLog(&activity, "Creating compressed merged overflow file"); + compBlkSz = activity.getOptUInt(THOROPT_SORT_COMPBLKSZ, DEFAULT_SORT_COMPBLKSZ); + ActPrintLog(&activity, "Creating compressed merged overflow file (block size = %u)", compBlkSz); } } - Owned output = createRowWriter(dataFile, rowIf, rwFlags); + Owned output = createRowWriter(dataFile, rowIf, rwFlags, nullptr, compBlkSz); bool overflowed = false; ActPrintLog(&activity, "Local Overflow Merge start"); diff --git a/thorlcr/thorutil/thbufdef.hpp b/thorlcr/thorutil/thbufdef.hpp index a5291a30364..bb61498962c 100644 --- a/thorlcr/thorutil/thbufdef.hpp +++ b/thorlcr/thorutil/thbufdef.hpp @@ -54,6 +54,7 @@ #define EXCESSIVE_PARALLEL_THRESHHOLD (0x500000) // 5MB #define LOOP_SMART_BUFFER_SIZE (0x100000*12) // 12MB #define LOCALRESULT_BUFFER_SIZE (0x100000*10) // 10MB +#define DEFAULT_SORT_COMPBLKSZ (0x10000) // 64K #define DEFAULT_KEYNODECACHEMB 10 #define DEFAULT_KEYLEAFCACHEMB 50 diff --git a/thorlcr/thorutil/thmem.cpp b/thorlcr/thorutil/thmem.cpp index c47244c2d65..5647704fdc6 100644 --- a/thorlcr/thorutil/thmem.cpp +++ b/thorlcr/thorutil/thmem.cpp @@ -61,8 +61,6 @@ static CriticalSection MTcritsect; // held when blocked static Owned MTthresholdnotify; static bool MTlocked = false; -#define DEFAULT_SORT_COMPBLKSZ 0x10000 // 64K - void checkMultiThorMemoryThreshold(bool inc) { if (MTthresholdnotify.get()) { diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 6fafd660216..7d8c76fa284 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -77,7 +77,10 @@ const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks, StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles, StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches, - StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles}); + StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, + StCycleIndexCacheBlockedCycles, StNumIndexMerges, StNumIndexMergeCompares, + StNumIndexSkips, StNumIndexNullSkips}); + const StatisticsMapping basicActivityStatistics({StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping hashJoinActivityStatistics({StNumLeftRows, StNumRightRows}, basicActivityStatistics);