Skip to content

Commit

Permalink
Removed supported programs synchro
Browse files Browse the repository at this point in the history
  • Loading branch information
dborowiecki committed Jul 2, 2024
1 parent 567ef85 commit 7e19dda
Showing 1 changed file with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ def synchronize_facilities(self):

create, update, delete = self._split_df(joined)

logging.info("Synchronizing Facilities")
self._create_new_facilities(create)
logging.info("Updating Facilities")
self._update_existing_facilities(update)

logging.info("Deactivating Deleted Facilities")
self._delete_removed_facilities(delete)

# Log the results
Expand All @@ -136,6 +140,7 @@ def validate_and_transform(self, facilities):
df = validate_facilities_dataframe(df).filter(
col("facilities.is_deleted") == False
)

self.synchronize_mising_geographic_zones(df)
self.synchronize_mising_types(df)
self.synchronize_products(df)
Expand Down Expand Up @@ -212,6 +217,7 @@ def _split_df(self, df):
deleted = df.filter(col("facilities.is_deleted") == True).filter(
col("existing_facilities.id").isNotNull()
)

existing = df.filter(col("facilities.is_deleted") == False)

new_facilities = existing.filter(col("existing_facilities.id").isNull())
Expand All @@ -228,24 +234,27 @@ def _create_new_facilities(self, facilities):
col(f"facilities.code"),
col("municipality.id"),
col("facility_type.id"),
col("code_id_dict"),
lit(True),
lit({}),
col("is_operational"),
lit(True)
),
)


logging.info(F"New Facilities to be created: { df.count()}")
for row in df.collect():
self._create_request(row)

def format_payload_f(self):
format_payload_f = udf(
lambda id, name, code, geographic_zone, facility_type, supported_programs, enabled: json.dumps(
lambda id, name, code, geographic_zone, facility_type, supported_programs, operational, enabled: json.dumps(
{
"id": id,
"code": code,
"name": name,
"geographicZone": {"id": geographic_zone},
"type": {"id": facility_type},
"active": enabled,
"active": operational,
"enabled": enabled,
"openLmisAccessible": enabled,
"supportedPrograms": [
Expand Down Expand Up @@ -316,7 +325,8 @@ def _update_existing_facilities(self, facilities: DataFrame, is_deleted=False):
col(f"facilities.code"),
col("municipality.id"),
col("facility_type.id"),
col("mergedServices"),
col("existing_facilities.supported_programs"), # Use Existing Services
col("is_operational"),
lit(not is_deleted),
),
)
Expand All @@ -330,6 +340,7 @@ def _update_existing_facilities(self, facilities: DataFrame, is_deleted=False):
col("existing_facilities.geographiczoneid"),
col("existing_facilities.typeid"),
col("existing_facilities.supported_programs"),
col("existing_facilities.active"),
col("existing_facilities.enabled"),
),
)
Expand All @@ -348,23 +359,9 @@ def compare_for_any_change(df, col1, col2):
changes = []
for field in schema.fields:
field_name = field.name
if field.dataType.simpleString().startswith("array"):
changes.append(
when(
size(
concat(
array_except(
f"{col1}.{field_name}", f"{col2}.{field_name}"
),
array_except(
f"{col2}.{field_name}", f"{col1}.{field_name}"
),
)
)
== 0,
False,
).otherwise(True)
)
# Skip supported programs as they remain the same.
if field_name == 'supportedPrograms':
continue
else:
changes.append(
col(f"{col1}.{field_name}") != col(f"{col2}.{field_name}")
Expand All @@ -385,15 +382,20 @@ def compare_for_any_change(df, col1, col2):

# Select the original JSON payloads and the any_change flag
return df.select(
"payload", "existing_facilities.id", "oldPayload", change_column
"payload", "existing_facilities.id", "oldPayload", change_column, *changes2
)


logging.info(F"Comparing Changes For Facilities: {facilities.count()}" )
# Apply the comparison function
result_df = compare_for_any_change(facilities, "json1_struct", "json2_struct")

result_df = result_df.filter(col("any_change") == True)[
["payload", "existing_facilities.id"]
]

logging.info(F"Facilities That Changed since last update: {result_df.count()}")

for row in result_df.collect():
self._update_request(row)

Expand Down

0 comments on commit 7e19dda

Please sign in to comment.