Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secure API route get_latest_patient_encounter_by_rfid #135

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from fastapi import Depends, HTTPException
from fastapi import Depends, HTTPException, Request

from sqlalchemy.orm import Session

Expand All @@ -22,12 +22,14 @@
tags=[MEDICAL],
)
def get_latest_patient_encounter_rfid(
patient_rfid: str,
request: Request,
db: Session = Depends(get_db),
) -> PatientEncounterResponseSchema:
"""
Retrieve a patient encounter from the database with the provided RFID.
"""
patient_rfid = request.headers.get('patient_rfid')

try:
encounter = get_latest_patient_encounter_by_patient_rfid(db, patient_rfid)
except Exception as err:
Expand Down
19 changes: 14 additions & 5 deletions app/api/schemas/patient_encounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from pydantic import BaseModel


class PatientEncounterSchema(BaseModel):
class BasePatientEncounterSchema(BaseModel):
"""
An encounter with a given patient.
A base schema for patient encounters, excluding patient RFID.
"""

age: Optional[int]
arrival_method: str
arrival_date: datetime
Expand All @@ -23,16 +24,24 @@ class PatientEncounterSchema(BaseModel):
handover_too: str
location: str
on_shift: bool
patient_rfid: Optional[str]
qr_code: Optional[str]
triage_acuity: str

class Config:
orm_mode = True

class PatientEncounterResponseSchema(PatientEncounterSchema):

class PatientEncounterSchema(BasePatientEncounterSchema):
"""
An encounter with a given patient, including patient RFID.
"""

patient_rfid: Optional[str]


class PatientEncounterResponseSchema(BasePatientEncounterSchema):
"""
The patient encounter response schema.
The patient encounter response schema, without patient RFID.
"""
# document uuid is created server-side when a document is created
patient_encounter_uuid: Optional[UUID]
Expand Down
16 changes: 8 additions & 8 deletions app/api/tests/test_patient_encounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def test_soft_delete(

@pytest.mark.needs(postgres=True)
def test_get_latest_patient_encounter_by_rfid(
client: TestClient, auth_header: str
client: TestClient, auth_header: dict
) -> None:
"""
Test that we are able to get the latest patient encounter by RFID
Expand All @@ -343,20 +343,20 @@ def test_get_latest_patient_encounter_by_rfid(
response = client.post(f"/api/{MEDICAL}/form", headers=auth_header, json=encounter2)
assert response.status_code == 200, "Unable to submit the second patient encounter."

# Headers now need to include the RFID
headers_with_rfid = {**auth_header, "patient_rfid": valid_rfid}

# Test valid RFID retrieval
doc_url = f"/api/{MEDICAL}/latest-form-rfid?" + urlencode(
{"patient_rfid": valid_rfid}
)
response = client.get(doc_url, headers=auth_header)
response = client.get(f"/api/{MEDICAL}/latest-form-rfid", headers=headers_with_rfid)
resp_data = response.json()
assert response.status_code == 200, "Should have found entry."
assert (
resp_data["document_num"] == encounter2["document_num"]
), "Not the latest patient encounter."

# Test invalid RFID
doc_url = f"/api/{MEDICAL}/latest-form-rfid?" + urlencode(
{"patient_rfid": "invalid_rfid"}
headers_with_invalid_rfid = {**auth_header, "patient_rfid": "invalid_rfid"}
response = client.get(
f"/api/{MEDICAL}/latest-form-rfid", headers=headers_with_invalid_rfid
)
response = client.get(doc_url, headers=auth_header)
assert response.status_code == 404, "Should not have found entry for invalid RFID."
Loading