Skip to content

Commit

Permalink
Merge pull request #8 from OpenLMIS-Angola/improvement/facilities_import
Browse files Browse the repository at this point in the history
Change delete behavior, merge products
  • Loading branch information
malinowskikam authored Jun 25, 2024
2 parents f2e74f9 + bb047cf commit 5b923c8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 32 deletions.
8 changes: 2 additions & 6 deletions sigeca_data_import_microservice/app/application/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@


class FacilitySyncScheduler:
def __init__(
self,
sync_service: FacilitySynchronizationService,
interval: int
):
def __init__(self, sync_service: FacilitySynchronizationService, interval: int):
self.sync_service = sync_service
self.sync_interval_minutes = interval
self.scheduler = BlockingScheduler()

def start(self):
self.run_sync() # Sync immedietly don't wait the interval for first trigger
self.run_sync() # Sync immedietly don't wait the interval for first trigger
self.scheduler.add_job(
self.run_sync, "interval", minutes=self.sync_interval_minutes
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
import uuid
from pyspark.sql.functions import col, udf, from_json, when, array_except, concat, size, lit
from pyspark.sql.functions import (
col,
udf,
from_json,
when,
array_except,
concat,
size,
lit,
)
from app.domain.resources import (
FacilityResourceRepository,
GeographicZoneResourceRepository,
Expand Down Expand Up @@ -200,7 +209,9 @@ def synchronize_products(self, df):
).synchronize()

def _split_df(self, df):
deleted = df.filter(col("facilities.is_deleted") == True)
deleted = df.filter(col("facilities.is_deleted") == True).filter(
col("existing_facilities.id").isNotNull()
)
existing = df.filter(col("facilities.is_deleted") == False)

new_facilities = existing.filter(col("existing_facilities.id").isNull())
Expand All @@ -218,6 +229,7 @@ def _create_new_facilities(self, facilities):
col("municipality.id"),
col("facility_type.id"),
col("code_id_dict"),
lit(True),
),
)

Expand All @@ -226,15 +238,15 @@ def _create_new_facilities(self, facilities):

def format_payload_f(self):
format_payload_f = udf(
lambda id, name, code, geographic_zone, facility_type, supported_programs: json.dumps(
lambda id, name, code, geographic_zone, facility_type, supported_programs, enabled: json.dumps(
{
"id": id,
"code": code,
"name": name,
"geographicZone": {"id": geographic_zone},
"type": {"id": facility_type},
"active": True,
"enabled": True,
"active": enabled,
"enabled": enabled,
"supportedPrograms": [
{"id": program_id}
for program_id in json.loads(supported_programs).values()
Expand All @@ -248,23 +260,47 @@ def format_payload_f(self):
def _create_request(self, data):
try:
self.lmis_client.send_post_request("facilities", data["payload"])
except Exception as e:
logging.error(f"An error occurred during facility creation request ({data}): {e}")
except Exception as e:
logging.error(
f"An error occurred during facility creation request ({data}): {e}"
)

def _update_request(self, data):
try:
self.lmis_client.send_put_request("facilities", data["id"], data["payload"])
except Exception as e:
logging.error(f"An error occurred during facility update request ({data}): {e}")
except Exception as e:
logging.error(
f"An error occurred during facility update request ({data}): {e}"
)

def _delete_request(self, data):
try:
self.lmis_client.send_delete_request("facilities", data["id"])
except Exception as e:
logging.error(f"An error occurred during facility delete request ({data}): {e}")
except Exception as e:
logging.error(
f"An error occurred during facility delete request ({data}): {e}"
)

def merge_json_f(self):
def _inner_merge(json1, json2):
dict1 = json.loads(json1)
dict2 = json.loads(json2)
merged_dict = {**dict2, **dict1}
return json.dumps(merged_dict)

return udf(_inner_merge, StringType())

def _update_existing_facilities(self, facilities: DataFrame, is_deleted=False):
merge_json_udf = self.merge_json_f()
facilities = facilities.withColumn(
"mergedServices",
merge_json_udf(
col("existing_facilities.supported_programs"), col("code_id_dict")
),
)

def _update_existing_facilities(self, facilities: DataFrame):
format_payload_f = self.format_payload_f()

facilities = facilities.withColumn(
"payload",
format_payload_f(
Expand All @@ -273,7 +309,8 @@ def _update_existing_facilities(self, facilities: DataFrame):
col(f"facilities.code"),
col("municipality.id"),
col("facility_type.id"),
col("code_id_dict"),
col("mergedServices"),
lit(not is_deleted),
),
)

Expand All @@ -286,6 +323,7 @@ def _update_existing_facilities(self, facilities: DataFrame):
col("existing_facilities.geographiczoneid"),
col("existing_facilities.typeid"),
col("existing_facilities.supported_programs"),
col("existing_facilities.enabled"),
),
)

Expand Down Expand Up @@ -334,8 +372,9 @@ def compare_for_any_change(df, col1, col2):
.alias("any_change")
)
else:
change_column = lit(False).alias("any_change") # Handle the case when changes list is empty

change_column = lit(False).alias(
"any_change"
) # Handle the case when changes list is empty

# Select the original JSON payloads and the any_change flag
return df.select(
Expand All @@ -351,7 +390,5 @@ def compare_for_any_change(df, col1, col2):
self._update_request(row)

def _delete_removed_facilities(self, facilities):
df = facilities[["existing_facilities.id"]]
for row in df.collect():
if row["id"]:
self._delete_request(row)
# Delete doesn't remove the facility, it set's it to disaled and unactive
self._update_existing_facilities(facilities, True)
15 changes: 8 additions & 7 deletions sigeca_data_import_microservice/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import os

from app.application.scheduler import FacilitySyncScheduler
from app.application.synchronization.facilities import \
FacilitySynchronizationService
from app.domain.resources import (FacilityOperatorResourceRepository,
FacilityResourceRepository,
FacilityTypeResourceRepository,
GeographicZoneResourceRepository,
ProgramResourceRepository)
from app.application.synchronization.facilities import FacilitySynchronizationService
from app.domain.resources import (
FacilityOperatorResourceRepository,
FacilityResourceRepository,
FacilityTypeResourceRepository,
GeographicZoneResourceRepository,
ProgramResourceRepository,
)
from app.infrastructure.database import get_engine
from app.infrastructure.jdbc_reader import JDBCReader
from app.infrastructure.open_lmis_api_client import OpenLmisApiClient
Expand Down

0 comments on commit 5b923c8

Please sign in to comment.