Skip to content

Commit

Permalink
Merge pull request DIRACGrid#8069 from DIRACGridBot/cherry-pick-2-987…
Browse files Browse the repository at this point in the history
…2ba6fc-integration

[sweep:integration] feat: method findFileByMetadata
  • Loading branch information
fstagni authored Mar 4, 2025
2 parents 87f2d23 + f725a81 commit beafcc5
Show file tree
Hide file tree
Showing 2 changed files with 375 additions and 0 deletions.
194 changes: 194 additions & 0 deletions src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class RucioFileCatalogClient(FileCatalogClientBase):
"resolveDataset",
"getLFNForPFN",
"getUserDirectory",
"getFileUserMetadata",
"findFilesByMetadata",
]

WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + [
Expand All @@ -78,13 +80,15 @@ class RucioFileCatalogClient(FileCatalogClientBase):
"createDataset",
"changePathOwner",
"changePathMode",
"setMetadata",
]

NO_LFN_METHODS = FileCatalogClientBase.NO_LFN_METHODS + [
"getUserDirectory",
"createUserDirectory",
"createUserMapping",
"removeUserDirectory",
"findFilesByMetadata",
]

ADMIN_METHODS = FileCatalogClientBase.ADMIN_METHODS + [
Expand Down Expand Up @@ -697,3 +701,193 @@ def getDirectorySize(self, lfns, longOutput=False, rawFiles=False):
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def getFileUserMetadata(self, path):
"""Get the meta data attached to a file, but also to
all its parents
"""
path = next(iter(path))
resDict = {"Successful": {}, "Failed": {}}
try:
did = self.__getDidsFromLfn(path)
meta = next(self.client.get_metadata_bulk(dids=[did], inherit=True, plugin="ALL"))
if meta["did_type"] == "FILE": # Should we also return the metadata for the directories ?
resDict["Successful"][path] = meta
else:
resDict["Failed"][path] = "Not a file"
except DataIdentifierNotFound:
resDict["Failed"][path] = "No such file or directory"
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def getFileUserMetadataBulk(self, lfns):
"""Get the meta data attached to a list of files, but also to
all their parents
"""
resDict = {"Successful": {}, "Failed": {}}
dids = []
lfnChunks = breakListIntoChunks(lfns, 1000)
for lfnList in lfnChunks:
try:
dids = [self.__getDidsFromLfn(lfn) for lfn in lfnList]
except Exception as err:
return S_ERROR(str(err))
try:
for met in self.client.get_metadata_bulk(dids=dids, inherit=True):
lfn = met["name"]
resDict["Successful"][lfn] = met
for lfn in lfnList:
if lfn not in resDict["Successful"]:
resDict["Failed"][lfn] = "No such file or directory"
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def setMetadataBulk(self, pathMetadataDict):
"""Add metadata for the given paths"""
resDict = {"Successful": {}, "Failed": {}}
dids = []
for path, metadataDict in pathMetadataDict.items():
try:
did = self.__getDidsFromLfn(path)
did["meta"] = metadataDict
dids.append(did)
except Exception as err:
return S_ERROR(str(err))
try:
self.client.set_dids_metadata_bulk(dids=dids, recursive=False)
except Exception as err:
return S_ERROR(str(err))
return S_OK(resDict)

@checkCatalogArguments
def setMetadata(self, path, metadataDict):
"""Add metadata to the given path"""
pathMetadataDict = {}
path = next(iter(path))
pathMetadataDict[path] = metadataDict
return self.setMetadataBulk(pathMetadataDict)

@checkCatalogArguments
def removeMetadata(self, path, metadata):
"""Remove the specified metadata for the given file"""
resDict = {"Successful": {}, "Failed": {}}
try:
did = self.__getDidsFromLfn(path)
failedMeta = {}
# TODO : Implement bulk delete_metadata method in Rucio
for meta in metadata:
try:
self.client.delete_metadata(scope=did["scope"], name=did["name"], key=meta)
except DataIdentifierNotFound:
return S_ERROR(f"File {path} not found")
except Exception as err:
failedMeta[meta] = str(err)

if failedMeta:
metaExample = list(failedMeta)[0]
result = S_ERROR(f"Failed to remove {len(failedMeta)} metadata, e.g. {failedMeta[metaExample]}")
result["FailedMetadata"] = failedMeta
except Exception as err:
return S_ERROR(str(err))
return S_OK()

def findFilesByMetadata(self, metadataFilterDict, path="/", timeout=120):
"""find the dids for the given metadataFilterDict"""
ruciometadataFilterDict = self.__transform_DIRAC_filter_dict_to_Rucio_filter_dict([metadataFilterDict])
dids = []
for scope in self.scopes:
try:
dids.extend(self.client.list_dids(scope=scope, filters=ruciometadataFilterDict, did_type="all"))
except Exception as err:
return S_ERROR(str(err))
return S_OK(dids)

def __transform_DIRAC_operator_to_Rucio(self, DIRAC_dict):
"""
Transforms a DIRAC's metadata Query dictionary to a Rucio-compatible dictionary.
This method takes a dictionary with DIRAC operators and converts it to a
dictionary with Rucio-compatible operators based on predefined mappings.
for example :
input_dict={'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
return = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
"""
rucio_dict = {}
operator_mapping = {">": ".gt", "<": ".lt", ">=": ".gte", "<=": ".lte", "=<": ".lte", "!=": ".ne", "=": ""}

for key, value in DIRAC_dict.items():
if isinstance(value, dict):
for operator, num in value.items():
if operator in operator_mapping:
mapped_operator = operator_mapping[operator]
rucio_dict[f"{key}{mapped_operator}"] = num
else:
rucio_dict[key] = value

return rucio_dict

def __transform_dict_with_in_operateur(self, DIRAC_dict_with_in_operator_list):
"""
Transforms a list of DIRAC dictionaries containing 'in' operators into a combined list of dictionaries,
expanding the 'in' operator into individual dictionaries while preserving other keys.
example
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14} }]
"""
if not isinstance(DIRAC_dict_with_in_operator_list, list):
raise TypeError("DIRAC_dict_with_in_operator_list must be a list of dictionaries")

combined_dict_list = [] # Final list of transformed dictionaries
break_reached = False # Boolean to track if 'in' was found and processed in any dictionary

# Process each dictionary in the input list
for DIRAC_dict_with_in_operator in DIRAC_dict_with_in_operator_list:
if not isinstance(DIRAC_dict_with_in_operator, dict):
raise TypeError("Each element in DIRAC_dict_with_in_operator_list must be a dictionary")

in_key = None
in_values = []

# Extract the key with 'in' operator and the list of values
for key, value in DIRAC_dict_with_in_operator.items():
if isinstance(value, dict) and "in" in value:
in_key = key
in_values = value["in"]
break_reached = True # 'in' operator found
break

# If an 'in' key exists, expand the dictionary for each value
if in_key:
for val in in_values:
# Copy the original dictionary and replace the 'in' key
new_dict = DIRAC_dict_with_in_operator.copy()
new_dict[in_key] = val # Replace the 'in' key with the current value
combined_dict_list.append(new_dict)
else:
# If no 'in' key, simply add the input dictionary as-is
combined_dict_list.append(DIRAC_dict_with_in_operator)

return combined_dict_list, break_reached

def __transform_DIRAC_filter_dict_to_Rucio_filter_dict(self, DIRAC_filter_dict_list):
"""
Transforms a list of DIRAC filter dictionaries into a list of Rucio filter dictionaries.
This method takes a list of filter dictionaries used in DIRAC and converts them into a format
that is compatible with Rucio. It handles the transformation of operators and expands filters
that use the 'in' operator.
example:
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
"""
break_detected = True
DIRAC_expanded_filters = DIRAC_filter_dict_list
while break_detected:
DIRAC_expanded_filters, break_detected = self.__transform_dict_with_in_operateur(DIRAC_expanded_filters)
Rucio_filters = []
for filter in DIRAC_expanded_filters:
Rucio_filters.append(self.__transform_DIRAC_operator_to_Rucio(filter))
return Rucio_filters
Loading

0 comments on commit beafcc5

Please sign in to comment.