Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] Improve getTransformationFiles performance #7833

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 41 additions & 52 deletions src/DIRAC/TransformationSystem/Client/TransformationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def getTransformationFiles(
timeout=1800,
offset=0,
maxfiles=None,
columns=None,
):
"""gets all the transformation files for a transformation, incrementally.
"limit" here is just used to determine the offset.
Expand All @@ -173,34 +174,39 @@ def getTransformationFiles(
condDict = {}
if timeStamp is None:
timeStamp = "LastUpdate"
# getting transformationFiles - incrementally
if "LFN" in condDict:
if isinstance(condDict["LFN"], str):
lfnList = [condDict["LFN"]]
else:
lfnList = sorted(condDict["LFN"])
# If a list of LFNs is given, use chunks of 1000 only
limit = limit if limit else 1000

if "LFN" not in condDict:
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns
)
# TransformationDB.getTransformationFiles includes a "Records"/"ParameterNames"
# that we don't want to return to the client so explicitly return S_OK with the value
if not res["OK"]:
return res
return S_OK(res["Value"])

# If LFNs requested, request in small batches, because...
# Probably not needed? Because this should always be a list
if isinstance(condDict["LFN"], str):
lfnList = [condDict["LFN"]]
else:
# By default get by chunks of 10000 files
lfnList = []
limit = limit if limit else 10000
lfnList = sorted(condDict["LFN"])
# If a list of LFNs is given, default to chunks of 1000 only
limit = limit if limit else 1000

transID = condDict.get("TransformationID", "Unknown")
offsetToApply = offset
retries = 5
while True:
if lfnList:
# If list is exhausted, exit
if offsetToApply >= len(lfnList):
break
# Apply the offset to the list of LFNs
condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit]
# No limit and no offset as the list is limited already
res = rpcClient.getTransformationFiles(condDict, older, newer, timeStamp, orderAttribute, None, None)
else:
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, limit, offsetToApply
)
# If list is exhausted, exit
if offsetToApply >= len(lfnList):
break
# Apply the offset to the list of LFNs
condDict["LFN"] = lfnList[offsetToApply : offsetToApply + limit]
# No limit and no offset as the list is limited already
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, None, None, columns
)
if not res["OK"]:
gLogger.error(
"Error getting files for transformation %s (offset %d), %s"
Expand All @@ -211,36 +217,19 @@ def getTransformationFiles(
if retries:
continue
return res
else:
condDictStr = str(condDict)
log = gLogger.debug if len(condDictStr) > 100 else gLogger.verbose
if not log(
"For conditions %s: result for limit %d, offset %d: %d files"
% (condDictStr, limit, offsetToApply, len(res["Value"]))
):
gLogger.verbose(
"For condition keys %s (trans %s): result for limit %d, offset %d: %d files"
% (
str(sorted(condDict)),
condDict.get("TransformationID", "None"),
limit,
offsetToApply,
len(res["Value"]),
)
)
if res["Value"]:
transformationFiles += res["Value"]
# Limit the number of files returned
if maxfiles and len(transformationFiles) >= maxfiles:
transformationFiles = transformationFiles[:maxfiles]
break
# Less data than requested, exit only if LFNs were not given
if not lfnList and len(res["Value"]) < limit:
gLogger.verbose(f"Result for limit {limit}, offset {offsetToApply}: {len(res['Value'])} files")
if res["Value"]:
transformationFiles += res["Value"]
# Limit the number of files returned
if maxfiles and len(transformationFiles) >= maxfiles:
transformationFiles = transformationFiles[:maxfiles]
break
offsetToApply += limit
# Reset number of retries for next chunk
retries = 5

# Less data than requested, exit only if LFNs were not given
if not lfnList and len(res["Value"]) < limit:
break
offsetToApply += limit
# Reset number of retries for next chunk
retries = 5
return S_OK(transformationFiles)

def getTransformationTasks(
Expand Down
75 changes: 32 additions & 43 deletions src/DIRAC/TransformationSystem/DB/TransformationDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,60 +594,49 @@ def getTransformationFiles(
limit=None,
offset=None,
connection=False,
columns=None,
):
"""Get files for the supplied transformations with support for the web standard structure"""
connection = self.__getConnection(connection)
req = f"SELECT {intListToString(self.TRANSFILEPARAMS)} FROM TransformationFiles"
originalFileIDs = {}
if condDict is None:
condDict = {}
if condDict or older or newer:
lfns = condDict.pop("LFN", None)
if lfns:
if isinstance(lfns, str):
lfns = [lfns]
res = self.__getFileIDsForLfns(lfns, connection=connection)
if not res["OK"]:
return res
originalFileIDs = res["Value"][0]
condDict["FileID"] = list(originalFileIDs)

for val in condDict.values():
if not val:
return S_OK([])
all_columns = ["LFN"] + self.TRANSFILEPARAMS
if columns is None:
columns = all_columns
elif not set(columns).issubset(all_columns):
return S_ERROR(f"Invalid columns requested, valid columns are: {all_columns}")

req = ", ".join(f"df.{x}" if x == "LFN" else f"tf.{x}" for x in columns)
req = f"SELECT {req} FROM TransformationFiles tf"
if "LFN" in columns or (condDict and "LFN" in condDict):
req = f"{req} JOIN DataFiles df ON tf.FileID = df.FileID"

fixedCondDict = {}
if condDict:
for key, value in condDict.items():
if key in self.TRANSFILEPARAMS:
fixedCondDict[f"tf.{key}"] = value
elif key in ["LFN"]:
fixedCondDict[f"df.{key}"] = value
else:
return S_ERROR(f"Invalid key {key} in condDict")
if timeStamp:
timeStamp = f"tf.{timeStamp}"
if fixedCondDict or older or newer:
cond = self.buildCondition(fixedCondDict, older, newer, timeStamp, orderAttribute, limit, offset=offset)
# When buildCondition tries to quote the column names, it will fail due to the table alias
# So we need to move the single quotes to the right place
req += f" {cond.replace('`tf.', 'tf.`').replace('`df.', 'df.`')}"

req = "{} {}".format(
req,
self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset),
)
res = self._query(req, conn=connection)
if not res["OK"]:
return res

transFiles = res["Value"]
fileIDs = [int(row[1]) for row in transFiles]
webList = []
resultList = []
if not fileIDs:
originalFileIDs = {}
else:
if not originalFileIDs:
res = self.__getLfnsForFileIDs(fileIDs, connection=connection)
if not res["OK"]:
return res
originalFileIDs = res["Value"][1]
for row in transFiles:
lfn = originalFileIDs[row[1]]
# Prepare the structure for the web
fDict = {"LFN": lfn}
fDict.update(dict(zip(self.TRANSFILEPARAMS, row)))
# Note: the line below is returning "None" if the item is None... This seems to work but is ugly...
rList = [lfn] + [str(item) if not isinstance(item, int) else item for item in row]
webList.append(rList)
resultList.append(fDict)
resultList = [dict(zip(columns, row)) for row in res["Value"]]
webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]]

result = S_OK(resultList)
result["Records"] = webList
result["ParameterNames"] = ["LFN"] + self.TRANSFILEPARAMS
result["ParameterNames"] = columns
return result

def getFileSummary(self, lfns, connection=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def export_getTransformationFiles(
orderAttribute=None,
limit=None,
offset=None,
columns=None,
):
if not condDict:
condDict = {}
Expand All @@ -301,6 +302,7 @@ def export_getTransformationFiles(
limit=limit,
offset=offset,
connection=False,
columns=columns,
)

####################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ def test_addTasksAndFiles(self):
for f in res["Value"]:
self.assertEqual(f["Status"], TransformationFilesStatus.ASSIGNED)

# make sure we can selectively select LFNs
res = self.transClient.getTransformationFiles({"TransformationID": transID, "LFN": ["/aa/lfn.1.txt"]})
assert res["OK"], res
assert len(res["Value"]) == 1, res
assert "TargetSE" in res["Value"][0].keys(), res

# make sure we can selectively select columns
res = self.transClient.getTransformationFiles({"TransformationID": transID}, columns=["LFN", "Status"])
assert res["OK"], res
assert sorted(res["Value"][0]) == ["LFN", "Status"], res

# now adding a new Transformation with new tasks, and introducing a mix of insertion,
# to test that the trigger works as it should
res = self.transClient.addTransformation(
Expand Down
Loading