Skip to content

Commit

Permalink
Merge pull request #70 from stuartcampbell/sync-commissioning-proposals
Browse files Browse the repository at this point in the history
Add synchronization of commissioning proposals
  • Loading branch information
padraic-shafer committed May 2, 2024
2 parents a20969c + 5d5d3c2 commit a2de1f8
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 39 deletions.
21 changes: 21 additions & 0 deletions src/nsls2api/services/facility_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ async def current_operating_cycle(facility: str) -> Optional[str]:
return cycle.name


async def cycle_year(
cycle_name: str, facility_name: FacilityName = FacilityName.nsls2
) -> Optional[str]:
"""
Cycle Year
This method retrieves the year for a given cycle.
:param cycle_name: The cycle name (str).
:param facility_name: The facility name (FacilityName).
:return: The year (str) or None if no year is found.
"""
cycle = await Cycle.find_one(
Cycle.name == cycle_name, Cycle.facility == facility_name
)
if cycle is None:
return None

return cycle.year


async def is_healthy(facility: str) -> bool:
"""
Database Health Check
Expand Down
47 changes: 42 additions & 5 deletions src/nsls2api/services/pass_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,33 @@ async def get_saf_from_proposal(
return saf_list


async def get_commissioning_proposals_by_year(year: int):
async def get_commissioning_proposals_by_year(year: int, facility: FacilityName = FacilityName.nsls2) -> Optional[list[PassProposal]]:

pass_facility = await facility_service.pass_id_for_facility(facility)
if not pass_facility:
error_message: str = f"Facility {facility} does not have a PASS ID."
logger.error(error_message)
raise PassException(error_message)

# The PASS ID for commissioning proposals is 300005
url = f"{base_url}Proposal/GetProposalsByType/{api_key}/NSLS-II/{year}/300005/NULL"
proposals = await _call_pass_webservice(url)
return proposals
url = f"{base_url}/Proposal/GetProposalsByType/{api_key}/{pass_facility}/{year}/300005/NULL"

try:
pass_commissioning_proposals = await _call_pass_webservice(url)
commissioning_proposal_list = []
if pass_commissioning_proposals and len(pass_commissioning_proposals) > 0:
for commissioning_proposal in pass_commissioning_proposals:
commissioning_proposal_list.append(PassProposal(**commissioning_proposal))
except ValidationError as error:
error_message = f"Error validating commissioning proposal data recevied from PASS for year {str(year)} at {facility} facility."
logger.error(error_message)
raise PassException(error_message) from error
except Exception as error:
error_message = f"Error retrieving commissioning proposal information from PASS for year {str(year)} at {facility} facility."
logger.exception(error_message)
raise PassException(error_message) from error

return commissioning_proposal_list


async def get_pass_resources():
Expand Down Expand Up @@ -211,7 +233,22 @@ async def get_proposals_allocated(
raise PassException(error_message)

url = f"{base_url}/Proposal/GetProposalsAllocated/{api_key}/{pass_facility}"
allocated_proposals = await _call_pass_webservice(url)

try:
pass_allocated_proposals = await _call_pass_webservice(url)
allocated_proposals = []
if pass_allocated_proposals and len(pass_allocated_proposals) > 0:
for allocation in pass_allocated_proposals:
allocated_proposals.append(PassAllocation(**allocation))
except ValidationError as error:
error_message = f"Error validating allocated proposal data recevied from PASS at {facility} facility."
logger.error(error_message)
raise PassException(error_message) from error
except Exception as error:
error_message = f"Error retrieving allocated proposal information from PASS at {facility} facility."
logger.exception(error_message)
raise PassException(error_message) from error

return allocated_proposals


Expand Down
82 changes: 48 additions & 34 deletions src/nsls2api/services/proposal_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ async def recently_updated(count=5, beamline: str | None = None):
# return result


async def fetch_proposals_for_cycle(cycle: str) -> list[str]:
cycle = await Cycle.find_one(Cycle.name == cycle)
async def fetch_proposals_for_cycle(cycle_name: str) -> list[str]:
cycle = await Cycle.find_one(Cycle.name == cycle_name)
if cycle is None:
raise LookupError(f"Cycle {cycle} not found")
raise LookupError(f"Cycle {cycle} not found in local database.")
return cycle.proposals


Expand Down Expand Up @@ -549,7 +549,7 @@ async def synchronize_proposal_from_pass(proposal_id: int) -> None:
user_list.append(pi_info)

data_session = generate_data_session_for_proposal(proposal_id)

proposal = Proposal(
proposal_id=str(pass_proposal.Proposal_ID),
title=pass_proposal.Title,
Expand Down Expand Up @@ -581,6 +581,30 @@ async def synchronize_proposal_from_pass(proposal_id: int) -> None:
logger.debug(f"Response: {response}")


async def update_proposals_with_cycle(cycle_name: str) -> None:
"""
Update the cycle <-> proposals mapping for the given cycle.
:param cycle_name: The name of the cycle to process proposals for.
:type cycle_name: str
"""

proposal_list = await fetch_proposals_for_cycle(cycle_name)

logger.info(f"Found {len(proposal_list)} proposals for cycle {cycle_name}.")

for proposal_id in proposal_list:
# Add the cycle to the Proposal object

try:
proposal = await proposal_by_id(int(proposal_id))
await proposal.update(AddToSet({Proposal.cycles: cycle_name}))
proposal.last_updated = datetime.datetime.now()
await proposal.save()
except LookupError as error:
logger.warning(error)


async def worker_synchronize_proposal_from_pass(proposal_id: int) -> None:
start_time = datetime.datetime.now()

Expand All @@ -595,44 +619,32 @@ async def worker_synchronize_proposal_from_pass(proposal_id: int) -> None:
async def worker_synchronize_proposals_for_cycle_from_pass(cycle: str) -> None:
start_time = datetime.datetime.now()

cycle_year = await facility_service.cycle_year(cycle)

proposals = await fetch_proposals_for_cycle(cycle)
logger.info(f"Synchronizing {len(proposals)} proposals for {cycle} cycle.")

for proposal_id in proposals:
logger.info(f"Synchronizing proposal {proposal_id}.")
await synchronize_proposal_from_pass(proposal_id)

time_taken = datetime.datetime.now() - start_time
commissioning_proposals: list[
PassProposal
] = await pass_service.get_commissioning_proposals_by_year(cycle_year)
logger.info(
f"Proposals for the {cycle} cycle synchronized in {time_taken.total_seconds():,.0f} seconds"
f"Synchronizing {len(proposals)} commissioning proposals for the year {cycle_year}."
)
for proposal in commissioning_proposals:
logger.info(f"Synchronizing commissioning proposal {proposal.Proposal_ID}.")
await synchronize_proposal_from_pass(proposal.Proposal_ID)

# Now update the cycle information for each proposal
await update_proposals_with_cycle(cycle)


async def update_proposal_to_cycle_mapping_from_pass(cycle: Cycle) -> None:
"""
Update the cycle <-> proposals mapping for the given cycle.
:param cycle: The cycle to process proposals for.
:type cycle: Cycle
"""

allocations = await pass_service.get_proposals_allocated_by_cycle(cycle.name)

for allocation in allocations:
# Add the proposal to the Cycle object
await cycle.update(AddToSet({Cycle.proposals: str(allocation.Proposal_ID)}))
cycle.last_updated = datetime.datetime.now()
await cycle.save()

# Add the cycle to the Proposal object
try:
proposal = await proposal_by_id(allocation.Proposal_ID)
await proposal.update(AddToSet({Proposal.cycles: cycle.name}))
proposal.last_updated = datetime.datetime.now()
await proposal.save()
except LookupError as error:
logger.warning(error)
time_taken = datetime.datetime.now() - start_time
logger.info(
f"Proposals for the {cycle} cycle synchronized in {time_taken.total_seconds():,.0f} seconds"
)


async def worker_update_proposal_to_cycle_mapping(
Expand All @@ -642,11 +654,13 @@ async def worker_update_proposal_to_cycle_mapping(
) -> None:
start_time = datetime.datetime.now()

#TODO: Add test that cycle and facility combination is valid
# TODO: Add test that cycle and facility combination is valid

if cycle:
# If we've specified a cycle then only sync that one
cycles = await Cycle.find(Cycle.name == str(cycle), Cycle.facility == facility).to_list()
cycles = await Cycle.find(
Cycle.name == str(cycle), Cycle.facility == facility
).to_list()
else:
cycles = await Cycle.find(Cycle.facility == facility).to_list()

Expand All @@ -655,7 +669,7 @@ async def worker_update_proposal_to_cycle_mapping(
logger.info(
f"Updating proposals with information for cycle {individual_cycle.name} (from PASS)"
)
await update_proposal_to_cycle_mapping_from_pass(individual_cycle)
await update_proposals_with_cycle(individual_cycle)

time_taken = datetime.datetime.now() - start_time
logger.info(
Expand Down

0 comments on commit a2de1f8

Please sign in to comment.