From 4dca57b90aeddcbaf5715c7185ed29aa1e39af8d Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Wed, 4 Dec 2024 18:23:23 +0530 Subject: [PATCH] update: fix worker trigger for aws --- crates/orchestrator/src/queue/job_queue.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index c2e2be29..636ce742 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -231,10 +231,11 @@ fn parse_job_message(message: &Delivery) -> Result, Cons /// Using string since localstack currently is instable with deserializing maps. /// Change this to accept a map after localstack is stable fn parse_worker_message(message: &Delivery) -> Result, ConsumptionError> { - let message: String = - message.payload_serde_json().expect("message processing failed").expect("message unwrapping failed"); - let trigger_type = WorkerTriggerType::from_str(message.as_str()).expect("trigger type unwrapping failed"); - + let payload = message + .borrow_payload() + .ok_or_else(|| ConsumptionError::Other(OtherError::from("Empty payload".to_string())))?; + let message_string = String::from_utf8_lossy(payload).to_string().trim_matches('\"').to_string(); + let trigger_type = WorkerTriggerType::from_str(message_string.as_str()).expect("trigger type unwrapping failed"); Ok(Some(WorkerTriggerMessage { worker: trigger_type })) }