Skip to content

Commit

Permalink
Add support for missing datastore records in trust mode for retrieveA…
Browse files Browse the repository at this point in the history
…rtifacts

This allows Zip retrieval to work in QBB mode.
  • Loading branch information
timj committed Oct 23, 2024
1 parent 6ebf8be commit 03874b4
Showing 1 changed file with 81 additions and 49 deletions.
130 changes: 81 additions & 49 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,71 @@ def _locations_to_URI(

return uris

@staticmethod
def _find_missing_records(
datastore: FileDatastore,
refs: Iterable[DatasetRef],
missing_ids: set[DatasetId],
artifact_existence: dict[ResourcePath, bool] | None = None,
) -> dict[DatasetId, list[StoredFileInfo]]:
if not missing_ids:
return {}

if artifact_existence is None:
artifact_existence = {}

Check warning on line 1990 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L1990

Added line #L1990 was not covered by tests

found_records: dict[DatasetId, list[StoredFileInfo]] = defaultdict(list)
id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids}

# This should be chunked in case we end up having to check
# the file store since we need some log output to show
# progress.
for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000):
records = {}
for missing in missing_ids_chunk:
# Ask the source datastore where the missing artifacts
# should be. An execution butler might not know about the
# artifacts even if they are there.
expected = datastore._get_expected_dataset_locations_info(id_to_ref[missing])
records[missing] = [info for _, info in expected]

# Call the mexist helper method in case we have not already
# checked these artifacts such that artifact_existence is
# empty. This allows us to benefit from parallelism.
# datastore.mexists() itself does not give us access to the
# derived datastore record.
log.verbose("Checking existence of %d datasets unknown to datastore", len(records))
ref_exists = datastore._process_mexists_records(
id_to_ref, records, False, artifact_existence=artifact_existence
)

# Now go through the records and propagate the ones that exist.
location_factory = datastore.locationFactory
for missing, record_list in records.items():
# Skip completely if the ref does not exist.
ref = id_to_ref[missing]
if not ref_exists[ref]:
log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref)
continue
# Check for file artifact to decide which parts of a
# disassembled composite do exist. If there is only a
# single record we don't even need to look because it can't
# be a composite and must exist.
if len(record_list) == 1:
dataset_records = record_list
else:
dataset_records = [
record
for record in record_list
if artifact_existence[record.file_location(location_factory).uri]
]
assert len(dataset_records) > 0, "Disassembled composite should have had some files."

# Rely on source_records being a defaultdict.
found_records[missing].extend(dataset_records)
log.verbose("Completed scan for missing data files")
return found_records

def retrieveArtifacts(
self,
refs: Iterable[DatasetRef],
Expand Down Expand Up @@ -2037,6 +2102,18 @@ def retrieveArtifacts(
# Retrieve all the records in bulk indexed by ref.id.
records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)

# Check for missing records.
known_ids = set(records)
log.debug("Number of datastore records found in database: %d", len(known_ids))
requested_ids = {ref.id for ref in refs}
missing_ids = requested_ids - known_ids

if missing_ids and not self.trustGetRequest:
raise ValueError(f"Number of datasets missing from this datastore: {len(missing_ids)}")

Check warning on line 2112 in python/lsst/daf/butler/datastores/fileDatastore.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastores/fileDatastore.py#L2112

Added line #L2112 was not covered by tests

missing_records = self._find_missing_records(self, refs, missing_ids)
records.update(missing_records)

# One artifact can be used by multiple DatasetRef.
# e.g. DECam.
artifact_to_ref_id: dict[ResourcePath, list[DatasetId]] = defaultdict(list)
Expand Down Expand Up @@ -2605,55 +2682,10 @@ def transfer_from(
len(missing_ids),
len(requested_ids),
)
id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids}

# This should be chunked in case we end up having to check
# the file store since we need some log output to show
# progress.
for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000):
records = {}
for missing in missing_ids_chunk:
# Ask the source datastore where the missing artifacts
# should be. An execution butler might not know about the
# artifacts even if they are there.
expected = source_datastore._get_expected_dataset_locations_info(id_to_ref[missing])
records[missing] = [info for _, info in expected]

# Call the mexist helper method in case we have not already
# checked these artifacts such that artifact_existence is
# empty. This allows us to benefit from parallelism.
# datastore.mexists() itself does not give us access to the
# derived datastore record.
log.verbose("Checking existence of %d datasets unknown to datastore", len(records))
ref_exists = source_datastore._process_mexists_records(
id_to_ref, records, False, artifact_existence=artifact_existence
)

# Now go through the records and propagate the ones that exist.
location_factory = source_datastore.locationFactory
for missing, record_list in records.items():
# Skip completely if the ref does not exist.
ref = id_to_ref[missing]
if not ref_exists[ref]:
log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref)
continue
# Check for file artifact to decide which parts of a
# disassembled composite do exist. If there is only a
# single record we don't even need to look because it can't
# be a composite and must exist.
if len(record_list) == 1:
dataset_records = record_list
else:
dataset_records = [
record
for record in record_list
if artifact_existence[record.file_location(location_factory).uri]
]
assert len(dataset_records) > 0, "Disassembled composite should have had some files."

# Rely on source_records being a defaultdict.
source_records[missing].extend(dataset_records)
log.verbose("Completed scan for missing data files")
found_records = self._find_missing_records(
source_datastore, refs, missing_ids, artifact_existence
)
source_records.update(found_records)

# See if we already have these records
target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
Expand Down

0 comments on commit 03874b4

Please sign in to comment.