Skip to content

Commit

Permalink
Export resources from API endpoint tand write to csv file
Browse files Browse the repository at this point in the history
  • Loading branch information
TraciebelWairimu committed Feb 22, 2024
1 parent 62caadd commit 610a82a
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
8 changes: 8 additions & 0 deletions importer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,11 @@ The coverage report `coverage.html` will be at the working directory
- The script will check to see if every user has a keycloak uuid that has a Practitioner uuid that matches the one provided in the csv file
- Note that if none of the Practitioner uuids match then all will be deleted
- Set `cascade_delete` to True or False if you would like to automatically delete any linked resources. If you set it to False, and there are any linked resources, then the resources will NOT be deleted

## 10. Export resources from API endpoint to CSV file
- Run `python3 main.py --export_resources True --parameter _lastUpdated --value gt2023-08-01 --batch_size 20 --resource_type locations --log_level info`
- `export_resources` can either be True or False, checks if it is True and exports the resources
- The `parameter` is used as a filter for the resources "e.g _lastUpdated", and `value` is where you pass the actual parameter value "e.g gt2023-08-01" used to filter the resources
- The `batch_size` is the number of resources handled/processed at a time
- Specify the `resource_type` you want to export, different resource_types are exported to different csv_files
- The csv_file containing the exported resources is labelled using the current time, to know when the resources were exported for example, csv/2024-02-21-12-21-export_Location.csv
83 changes: 81 additions & 2 deletions importer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

global_access_token = ""


# This function takes in a csv file
# reads it and returns a list of strings/lines
# It ignores the first line (assumes headers)
Expand Down Expand Up @@ -861,8 +862,75 @@ def clean_duplicates(users, cascade_delete):
logging.info("No Practitioners found")


# Create a csv file and initialize the CSV writer
def write_csv(data, resource_type, fieldnames):
logging.info("Writing to csv file")
current_time = datetime.now().strftime("%Y-%m-%d-%H-%M")
csv_file = f"csv/{current_time}-export_{resource_type}.csv"
with open(csv_file, 'w', newline='') as file:
csv_writer = csv.writer(file)
csv_writer.writerow(fieldnames)
csv_writer.writerows(data)


# This function exports resources from the API to a csv file
def export_resources_to_csv(resource_type, parameter, value, batch_size):
resource_type = get_valid_resource_type(resource_type)
resource_url = "/".join([config.fhir_base_url, resource_type])
if len(parameter) > 0:
resource_url = resource_url + "?" + parameter + "=" + value + "&_count=" + str(batch_size)
response = handle_request("GET", "", resource_url)
if response[1] == 200:
resources = json.loads(response[0])
data = []
if resource_type == "Location":
elements = ["name", "status", "id", "identifier", "parentName", "parentID", "type", "typeCode",
"physicalType", "physicalTypeCode"]
elif resource_type == "Organization":
elements = ["name", "active", "id", "identifier", "alias"]
elif resource_type == "CareTeam":
elements = ["name", "status", "id", "identifier", " organizations", "participants"]
else:
elements = []
for x in resources["entry"]:
rl = []
for element in elements:
try:
if element == "active":
value = x["resource"]["active"]
elif element == "identifier":
value = x["resource"]["identifier"][0]["value"]
elif element == "parentName":
value = x["resource"]["partOf"]["display"]
elif element == "parentID":
value = x["resource"]["partOf"]["reference"]
elif element == "type":
value = x["resource"]["type"][0]["coding"][0]["display"]
elif element == "typeCode":
value = x["resource"]["type"][0]["coding"][0]["code"]
elif element == "physicalType":
value = x["resource"]["physicalType"]["coding"][0]["display"]
elif element == "physicalTypeCode":
value = x["resource"]["physicalType"]["coding"][0]["code"]
elif element == "alias":
value = x["resource"]["alias"][0]
else:
value = x["resource"][element]
except KeyError:
value = ""
rl.append(value)
data.append(rl)
write_csv(data, resource_type, elements)
else:
logging.error(f"Failed to retrieve resource. Status code: {response[1]}")


@click.command()
@click.option("--csv_file", required=True)
@click.option("--csv_file", required=False)
@click.option("--export_resources", required=False)
@click.option("--parameter", required=False)
@click.option("--value", required=False)
@click.option("--batch_size", required=False)
@click.option("--access_token", required=False)
@click.option("--resource_type", required=False)
@click.option("--assign", required=False)
Expand All @@ -874,7 +942,10 @@ def clean_duplicates(users, cascade_delete):
"--log_level", type=click.Choice(["DEBUG", "INFO", "ERROR"], case_sensitive=False)
)
def main(
csv_file, access_token, resource_type, assign, setup, group, roles_max, cascade_delete, log_level
csv_file, export_resources, parameter, value, batch_size, access_token, resource_type, assign, setup, group,
roles_max,
cascade_delete,
log_level
):
if log_level == "DEBUG":
logging.basicConfig(level=logging.DEBUG)
Expand All @@ -886,6 +957,13 @@ def main(
start_time = datetime.now()
logging.info("Start time: " + start_time.strftime("%H:%M:%S"))

logging.info("Starting export...")
if export_resources == "True":
logging.info("Exporting " + resource_type)
export_resources_to_csv(resource_type, parameter, value, batch_size)
logging.info("Successfully written to csv")
exit()

# set access token
if access_token:
global global_access_token
Expand Down Expand Up @@ -974,5 +1052,6 @@ def main(
total_time = end_time - start_time
logging.info("Total time: " + str(total_time.total_seconds()) + " seconds")


if __name__ == "__main__":
main()
52 changes: 52 additions & 0 deletions importer/test_main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import unittest
from datetime import datetime
from jsonschema import validate
from mock import patch
from main import (
read_csv,
write_csv,
build_payload,
build_org_affiliation,
extract_matches,
create_user_resources,
export_resources_to_csv,
)


Expand All @@ -18,6 +21,23 @@ def test_read_csv(self):
self.assertIsInstance(records, list)
self.assertEqual(len(records), 3)

def test_write_csv(self):
self.test_data = [
["e2e-mom", "True", "caffe509-ae56-4d42-945e-7b4c161723d1", "d93ae7c3-73c0-43d1-9046-425a3466ecec",
"handy"],
["e2e-skate", "True", "2d4feac9-9ab5-4585-9b33-e5abd14ceb0f", "58605ed8-7217-4bf3-8122-229b6f47fa64",
"foolish"
]
]
self.test_resource_type = "test_organization"
self.test_fieldnames = ["name", "active", "id", "identifier", "alias"]
write_csv(self.test_data, self.test_resource_type, self.test_fieldnames)
self.assertIsInstance(self.test_data, list)
self.assertEqual(len(self.test_data), 2)
current_time = datetime.now().strftime("%Y-%m-%d-%H-%M")
expected_csv_file_path = f"csv/{current_time}-export_{self.test_resource_type}.csv"
self.assertTrue(expected_csv_file_path, "CSV file created in expected location")

@patch("main.get_resource")
def test_build_payload_organizations(self, mock_get_resource):
mock_get_resource.return_value = "1"
Expand Down Expand Up @@ -390,6 +410,38 @@ def test_update_resource_with_non_existing_id_fails(self, mock_get_resource):
"Trying to update a Non-existent resource", str(raised_error.exception)
)

@patch("main.handle_request")
@patch("main.write_csv")
def test_export_resources_to_csv(self, mock_write_csv, mock_handle_request):
mock_response_data = {
"entry": [
{
"resource": {
"name": "City1",
"status": "active",
"id": "ba787982-b973-4bd5-854e-eacbe161e297",
"identifier": [{"value": "ba787 982-b973-4bd5-854e-eacbe161e297"}],
"partOf": {"display": "test location-1", "reference": "Location/18fcbc2e-4240-4a84-a270"
"-7a444523d7b6"},
"type": [{"coding": [{"display": "Jurisdiction", "code": "jdn"}]}],
"physicalType": {"coding": [{"display": "Jurisdiction", "code": "jdn"}]},
}
}
]
}
mock_handle_request.return_value = (json.dumps(mock_response_data), 200)
test_data = [
['City1', 'active', 'ba787982-b973-4bd5-854e-eacbe161e297', 'ba787 982-b973-4bd5-854e-eacbe161e297',
'test location-1', 'Location/18fcbc2e-4240-4a84-a270-7a444523d7b6', 'Jurisdiction', 'jdn',
'Jurisdiction', 'jdn']]

test_elements = ["name", "status", "id", "identifier", "parentName", "parentID", "type", "typeCode",
"physicalType", "physicalTypeCode"]
export_resources_to_csv("locations", "parameter", "value", 10)
resource_type = "Location"

mock_write_csv.assert_called_once_with(test_data, resource_type, test_elements)


if __name__ == "__main__":
unittest.main()

0 comments on commit 610a82a

Please sign in to comment.