Skip to content

Commit

Permalink
Add Serialized wrapper to make it harder to accidentally deserializ…
Browse files Browse the repository at this point in the history
…e the wrong type
  • Loading branch information
Kobzol committed Oct 18, 2024
1 parent 98ea7ba commit 0c5d94d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 18 deletions.
11 changes: 4 additions & 7 deletions crates/hyperqueue/src/client/commands/journal/output.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::client::output::json::format_datetime;
use crate::server::event::payload::EventPayload;
use crate::server::event::{bincode_config, Event};
use crate::transfer::messages::JobDescription;
use bincode::Options;
use crate::server::event::Event;
use crate::transfer::messages::{JobDescription, SubmitRequest};
use serde_json::json;
use tako::worker::WorkerOverview;

Expand Down Expand Up @@ -103,14 +102,12 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
closed_job,
serialized_desc,
} => {
let job_desc: JobDescription = bincode_config()
.deserialize(&serialized_desc)
.expect("Invalid job description data");
let submit: SubmitRequest = serialized_desc.deserialize().expect("Invalid submit data");
json!({
"type": "job-created",
"job": job_id,
"closed_job": closed_job,
"desc": JobInfoFormatter(&job_desc).to_json(),
"desc": JobInfoFormatter(&submit.job_desc).to_json(),
})
}
EventPayload::JobCompleted(job_id) => json!({
Expand Down
32 changes: 32 additions & 0 deletions crates/hyperqueue/src/server/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use bincode::Options;
use chrono::serde::ts_milliseconds;
use chrono::{DateTime, Utc};
use payload::EventPayload;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;

pub type EventId = u32;

Expand All @@ -21,3 +23,33 @@ pub struct Event {
pub(crate) fn bincode_config() -> impl Options {
bincode::DefaultOptions::new().allow_trailing_bytes()
}

/// Strongly typed wrapper over <T> serialized with Bincode.
#[derive(Serialize, Deserialize, Debug)]
pub struct Serialized<T: Serialize + DeserializeOwned> {
#[serde(with = "serde_bytes")]
data: Vec<u8>,
_phantom: PhantomData<T>,
}

impl<T: Serialize + DeserializeOwned> Clone for Serialized<T> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
_phantom: PhantomData,
}
}
}

impl<T: Serialize + DeserializeOwned> Serialized<T> {
pub fn new(value: &T) -> bincode::Result<Self> {
Ok(Self {
data: bincode_config().serialize(value)?,
_phantom: Default::default(),
})
}

pub fn deserialize(&self) -> bincode::Result<T> {
bincode_config().deserialize(&self.data)
}
}
9 changes: 5 additions & 4 deletions crates/hyperqueue/src/server/event/payload.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::server::autoalloc::AllocationId;
use crate::server::autoalloc::QueueId;
use crate::transfer::messages::{AllocationQueueParams, JobDescription};
use crate::server::event::Serialized;
use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest};
use crate::JobId;
use crate::{JobTaskId, WorkerId};
use serde::{Deserialize, Serialize};
Expand All @@ -19,13 +20,13 @@ pub enum EventPayload {
WorkerOverviewReceived(WorkerOverview),
/// A Job was submitted by the user -- full information to reconstruct the job;
/// it will be only stored into file, not held in memory
/// Vec<u8> is serialized JobDescription; the main reason is avoiding duplication of JobDescription
/// Vec<u8> is serialized SubmitRequest; the main reason is avoiding duplication of SubmitRequest
/// (we serialize it before it is stripped down)
/// and a nice side effect is that Events can be deserialized without deserializing a potentially large submit data
Submit {
job_id: JobId,
closed_job: bool,
serialized_desc: Vec<u8>,
serialized_desc: Serialized<SubmitRequest>,
},
/// All tasks of the job have finished.
JobCompleted(JobId),
Expand All @@ -43,7 +44,7 @@ pub enum EventPayload {
job_id: JobId,
task_id: JobTaskId,
},
// Task that failed to execute
/// Task has failed to execute
TaskFailed {
job_id: JobId,
task_id: JobTaskId,
Expand Down
5 changes: 2 additions & 3 deletions crates/hyperqueue/src/server/event/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::server::autoalloc::{AllocationId, QueueId};
use crate::server::event::log::{EventStreamMessage, EventStreamSender};
use crate::server::event::payload::EventPayload;
use crate::server::event::{bincode_config, Event};
use crate::server::event::{Event, Serialized};
use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest};
use crate::{JobId, JobTaskId, WorkerId};
use bincode::Options;
use chrono::Utc;
use smallvec::SmallVec;
use tako::gateway::LostWorkerReason;
Expand Down Expand Up @@ -64,7 +63,7 @@ impl EventStreamer {
self.send_event(EventPayload::Submit {
job_id,
closed_job: submit_request.job_id.is_none(),
serialized_desc: bincode_config().serialize(submit_request)?,
serialized_desc: Serialized::new(submit_request)?,
});
Ok(())
}
Expand Down
5 changes: 1 addition & 4 deletions crates/hyperqueue/src/server/restore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::server::autoalloc::QueueId;
use crate::server::client::submit_job_desc;
use crate::server::event::bincode_config;
use crate::server::event::log::JournalReader;
use crate::server::event::payload::EventPayload;
use crate::server::job::{Job, JobTaskState, StartedTaskData};
Expand All @@ -10,7 +9,6 @@ use crate::transfer::messages::{
};
use crate::worker::start::RunningTaskContext;
use crate::{JobId, JobTaskId, Map};
use bincode::Options;
use std::path::Path;
use tako::gateway::NewTasksMessage;
use tako::{ItemId, WorkerId};
Expand Down Expand Up @@ -177,8 +175,7 @@ impl StateRestorer {
serialized_desc,
} => {
log::debug!("Replaying: JobTasksCreated {job_id}");
let submit_request: SubmitRequest =
bincode_config().deserialize(&serialized_desc)?;
let submit_request: SubmitRequest = serialized_desc.deserialize()?;
if closed_job {
let mut job = RestorerJob::new(submit_request.job_desc, false);
job.add_submit(submit_request.submit_desc);
Expand Down

0 comments on commit 0c5d94d

Please sign in to comment.