Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into 129-Export-Resource-uuids
Browse files Browse the repository at this point in the history
TraciebelWairimu authored Feb 28, 2024
2 parents 74d6f73 + 174645a commit 2619ae4
Showing 3 changed files with 129 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@ import org.hl7.fhir.r4.model.Encounter
import org.hl7.fhir.r4.model.EpisodeOfCare
import org.hl7.fhir.r4.model.Group
import org.hl7.fhir.r4.model.Immunization
import org.hl7.fhir.r4.model.ListResource
import org.hl7.fhir.r4.model.Location
import org.hl7.fhir.r4.model.Observation
import org.hl7.fhir.r4.model.Patient
import org.hl7.fhir.r4.model.PlanDefinition
@@ -55,6 +57,8 @@ class TransformSupportServices constructor(private val simpleWorkerContext: Simp
"Task_Output" -> Task.TaskOutputComponent()
"Task_Restriction" -> Task.TaskRestrictionComponent()
"AdverseEvent_SuspectEntity" -> AdverseEvent.AdverseEventSuspectEntityComponent()
"Location_Position" -> Location.LocationPositionComponent()
"List_Entry" -> ListResource.ListEntryComponent()
else -> ResourceFactory.createResourceOrType(name)
}
}
3 changes: 3 additions & 0 deletions importer/README.md
Original file line number Diff line number Diff line change
@@ -34,6 +34,9 @@ and then posts them to the API for creation
3. Create a `config.py` file. The `sample_config.py` is an example of what this should look like. Populate it with the right credentials
4. Run script - `python3 main.py --csv_file csv/locations.csv --resource_type locations`
5. You can turn on logging by passing a `--log_level` to the command line as `info`, `debug` or `error`. For example `python3 main.py --csv_file csv/locations.csv --resource_type locations --log_level info`
6. There is a progress bar that shows the read_csv and build_payload progress as it is going on
7. You can get only the response from the api after the import is done by passing `--only_response true`


See example csvs in the csv folder

197 changes: 122 additions & 75 deletions importer/main.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
import click
import requests
import logging
import logging.config
import backoff
from datetime import datetime
from oauthlib.oauth2 import LegacyApplicationClient
@@ -30,8 +31,9 @@ def read_csv(csv_file):
next(records)
all_records = []

for record in records:
all_records.append(record)
with click.progressbar(records, label='Progress::Reading csv ') as read_csv_progress:
for record in read_csv_progress:
all_records.append(record)

logging.info("Returning records from csv file")
return all_records
@@ -77,7 +79,7 @@ def get_access_token():
# to create resources
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=180)
def post_request(request_type, payload, url):
logging.info("Posting request-----------------")
logging.info("Posting request")
logging.info("Request type: " + request_type)
logging.info("Url: " + url)
logging.debug("Payload: " + payload)
@@ -516,63 +518,65 @@ def build_payload(resource_type, resources, resource_payload_file):
with open(resource_payload_file) as json_file:
payload_string = json_file.read()

for resource in resources:
name, status, method, id, *_ = resource
try:
if method == "create":
version = "1"
if len(id.strip()) > 0:
# use the provided id
unique_uuid = id.strip()
identifier_uuid = id.strip()
else:
# generate a new uuid
unique_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, name))
identifier_uuid = unique_uuid
except IndexError:
# default if method is not provided
unique_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, name))
identifier_uuid = unique_uuid
version = "1"

try:
if method == "update":
if len(id.strip()) > 0:
version = get_resource(id, resource_type)
if version != "0":
with click.progressbar(resources, label='Progress::Building payload ') as build_payload_progress:
for resource in build_payload_progress:
logging.info("\t")
name, status, method, id, *_ = resource
try:
if method == "create":
version = "1"
if len(id.strip()) > 0:
# use the provided id
unique_uuid = id.strip()
identifier_uuid = id.strip()
else:
logging.info("Failed to get resource!")
raise ValueError("Trying to update a Non-existent resource")
else:
logging.info("The id is required!")
raise ValueError("The id is required to update a resource")
except IndexError:
raise ValueError("The id is required to update a resource")

# ps = payload_string
ps = (
payload_string.replace("$name", name)
.replace("$unique_uuid", unique_uuid)
.replace("$identifier_uuid", identifier_uuid)
.replace("$version", version)
)
# generate a new uuid
unique_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, name))
identifier_uuid = unique_uuid
except IndexError:
# default if method is not provided
unique_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, name))
identifier_uuid = unique_uuid
version = "1"

try:
ps = ps.replace("$status", status)
except IndexError:
ps = ps.replace("$status", "active")
try:
if method == "update":
if len(id.strip()) > 0:
version = get_resource(id, resource_type)
if version != "0":
# use the provided id
unique_uuid = id.strip()
identifier_uuid = id.strip()
else:
logging.info("Failed to get resource!")
raise ValueError("Trying to update a Non-existent resource")
else:
logging.info("The id is required!")
raise ValueError("The id is required to update a resource")
except IndexError:
raise ValueError("The id is required to update a resource")

# ps = payload_string
ps = (
payload_string.replace("$name", name)
.replace("$unique_uuid", unique_uuid)
.replace("$identifier_uuid", identifier_uuid)
.replace("$version", version)
)

if resource_type == "organizations":
ps = organization_extras(resource, ps)
elif resource_type == "locations":
ps = location_extras(resource, ps)
elif resource_type == "careTeams":
ps = care_team_extras(resource, ps, "min", [], [], "orgs & users")
try:
ps = ps.replace("$status", status)
except IndexError:
ps = ps.replace("$status", "active")

final_string = final_string + ps + ","
if resource_type == "organizations":
ps = organization_extras(resource, ps)
elif resource_type == "locations":
ps = location_extras(resource, ps)
elif resource_type == "careTeams":
ps = care_team_extras(resource, ps, "min", [], [], "orgs & users")

final_string = final_string + ps + ","

final_string = initial_string + final_string[:-1] + " ] } "
return final_string
@@ -964,6 +968,39 @@ def export_resources_to_csv(resource_type, parameter, value, limit):
logging.info("No Resources Found")
else:
logging.error(f"Failed to retrieve resource. Status code: {response[1]} response: {response[0]}")


class ResponseFilter(logging.Filter):
def __init__(self, param=None):
self.param = param

def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param in record.msg
return allow


LOGGING = {
'version': 1,
'filters': {
'custom-filter': {
'()': ResponseFilter,
'param': 'final-response',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['custom-filter']
}
},
'root': {
'level': 'INFO',
'handlers': ['console']
},
}


@click.command()
@@ -979,6 +1016,7 @@ def export_resources_to_csv(resource_type, parameter, value, limit):
@click.option("--group", required=False)
@click.option("--roles_max", required=False, default=500)
@click.option("--cascade_delete", required=False, default=False)
@click.option("--only_response", required=False)
@click.option(
"--log_level", type=click.Choice(["DEBUG", "INFO", "ERROR"], case_sensitive=False)
)
@@ -989,11 +1027,15 @@ def main(
log_level
):
if log_level == "DEBUG":
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(filename='importer.log', encoding='utf-8', level=logging.DEBUG)
elif log_level == "INFO":
logging.basicConfig(level=logging.INFO)
logging.basicConfig(filename='importer.log', encoding='utf-8', level=logging.INFO)
elif log_level == "ERROR":
logging.basicConfig(level=logging.ERROR)
logging.basicConfig(filename='importer.log', encoding='utf-8', level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler())

if only_response:
logging.config.dictConfig(LOGGING)

start_time = datetime.now()
logging.info("Start time: " + start_time.strftime("%H:%M:%S"))
@@ -1009,30 +1051,33 @@ def main(
global global_access_token
global_access_token = access_token

final_response = ""

logging.info("Starting csv import...")
resource_list = read_csv(csv_file)
if resource_list:
if resource_type == "users":
logging.info("Processing users")
for user in resource_list:
user_id = create_user(user)
if user_id == 0:
# user was not created above, check if it already exists
user_id = confirm_keycloak_user(user)
if user_id != 0:
# user_id has been retrieved
# check practitioner
practitioner_exists = confirm_practitioner(user, user_id)
if not practitioner_exists:
payload = create_user_resources(user_id, user)
handle_request("POST", payload, config.fhir_base_url)
logging.info("Processing complete!")
with click.progressbar(resource_list, label="Progress:Processing users ") as process_user_progress:
for user in process_user_progress:
user_id = create_user(user)
if user_id == 0:
# user was not created above, check if it already exists
user_id = confirm_keycloak_user(user)
if user_id != 0:
# user_id has been retrieved
# check practitioner
practitioner_exists = confirm_practitioner(user, user_id)
if not practitioner_exists:
payload = create_user_resources(user_id, user)
final_response = handle_request("POST", payload, config.fhir_base_url)
logging.info("Processing complete!")
elif resource_type == "locations":
logging.info("Processing locations")
json_payload = build_payload(
"locations", resource_list, "json_payloads/locations_payload.json"
)
handle_request("POST", json_payload, config.fhir_base_url)
final_response = handle_request("POST", json_payload, config.fhir_base_url)
logging.info("Processing complete!")
elif resource_type == "organizations":
logging.info("Processing organizations")
@@ -1041,32 +1086,32 @@ def main(
resource_list,
"json_payloads/organizations_payload.json",
)
handle_request("POST", json_payload, config.fhir_base_url)
final_response = handle_request("POST", json_payload, config.fhir_base_url)
logging.info("Processing complete!")
elif resource_type == "careTeams":
logging.info("Processing CareTeams")
json_payload = build_payload(
"careTeams", resource_list, "json_payloads/careteams_payload.json"
)
handle_request("POST", json_payload, config.fhir_base_url)
final_response = handle_request("POST", json_payload, config.fhir_base_url)
logging.info("Processing complete!")
elif assign == "organization-Location":
logging.info("Assigning Organizations to Locations")
matches = extract_matches(resource_list)
json_payload = build_org_affiliation(matches, resource_list)
handle_request("POST", json_payload, config.fhir_base_url)
final_response = handle_request("POST", json_payload, config.fhir_base_url)
logging.info("Processing complete!")
elif assign == "careTeam-Organization":
logging.info("Assigning CareTeam to Organization")
matches = extract_matches(resource_list)
json_payload = fetch_and_build(matches, "orgs")
handle_request("POST", json_payload, config.fhir_base_url)
final_response = handle_request("POST", json_payload, config.fhir_base_url)
logging.info("Processing complete!")
elif assign == "user-careTeam":
logging.info("Assigning users to careTeam")
matches = extract_matches(resource_list)
json_payload = fetch_and_build(matches, "users")
handle_request("POST", json_payload, config.fhir_base_url)
final_response = handle_request("POST", json_payload, config.fhir_base_url)
logging.info("Processing complete!")
elif setup == "roles":
logging.info("Setting up keycloak roles")
@@ -1087,6 +1132,8 @@ def main(
else:
logging.error("Empty csv file!")

logging.info("{ \"final-response\": " + final_response.text + "}")

end_time = datetime.now()
logging.info("End time: " + end_time.strftime("%H:%M:%S"))
total_time = end_time - start_time

0 comments on commit 2619ae4

Please sign in to comment.