Skip to content

Commit

Permalink
Merge pull request #2811 from rina23q/bugfix/2793/c8y-firmware-update…
Browse files Browse the repository at this point in the history
…-should-have-external-url

Uploader supports multipart/form-data and attaches filename
  • Loading branch information
rina23q authored Apr 16, 2024
2 parents 9741b3d + d342cd5 commit 404dda3
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 28 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ log_manager = { path = "crates/common/log_manager" }
logged_command = { path = "crates/common/logged_command" }
maplit = "1.0"
miette = { version = "5.5.0", features = ["fancy"] }
mime = "0.3.17"
mime_guess = "2.0.4"
mockall = "0.11"
mockito = "1.1.0"
mqtt_channel = { path = "crates/common/mqtt_channel" }
Expand Down
8 changes: 7 additions & 1 deletion crates/common/upload/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ axum_tls = { workspace = true, features = ["error-matching"] }
backoff = { workspace = true }
camino = { workspace = true }
log = { workspace = true }
reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots"] }
mime = { workspace = true }
mime_guess = { workspace = true }
reqwest = { workspace = true, features = [
"stream",
"rustls-tls-native-roots",
"multipart",
] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["codec"] }
Expand Down
3 changes: 3 additions & 0 deletions crates/common/upload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@ mod upload;
pub use crate::error::UploadError;
pub use crate::upload::Auth;
pub use crate::upload::ContentType;
pub use crate::upload::FormData;
pub use crate::upload::UploadInfo;
pub use crate::upload::UploadMethod;
pub use crate::upload::Uploader;
pub use mime::Mime;
125 changes: 108 additions & 17 deletions crates/common/upload/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use camino::Utf8Path;
use camino::Utf8PathBuf;
use log::info;
use log::warn;
use mime::Mime;
use mime_guess::MimeGuess;
use reqwest::header::CONTENT_LENGTH;
use reqwest::header::CONTENT_TYPE;
use reqwest::multipart;
use reqwest::Body;
use reqwest::Identity;
use std::fmt::Display;
use std::fmt::Formatter;
use std::time::Duration;
use tokio::fs::File;
use tokio_util::codec::BytesCodec;
Expand All @@ -27,26 +28,59 @@ fn default_backoff() -> ExponentialBackoff {
}
}

/// Auto tries to detect the mime from the given file extension without direct file access.
/// Custom sets a custom Content-Type.
/// If multipart request is needed, choose FormData.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ContentType {
TextPlain,
ApplicationOctetStream,
Auto,
Custom(Mime),
FormData(FormData),
}

impl Display for ContentType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ContentType::TextPlain => write!(f, "text/plain"),
ContentType::ApplicationOctetStream => write!(f, "application/octet-stream"),
/// Dataset to construct reqwest::multipart::Form.
/// To avoid using reqwest::multipart::Form inside the ContentType enum
/// since reqwest::multipart::Form doesn't support Copy or Clone.
/// If mime is None, the mime will be guessed while uploading a file.
#[derive(Debug, Eq, Clone, PartialEq)]
pub struct FormData {
filename: String,
mime: Option<Mime>,
}

impl FormData {
pub fn new(filename: String) -> Self {
Self {
filename,
mime: None,
}
}

pub fn set_mime(self, mime: Mime) -> Self {
Self {
mime: Some(mime),
..self
}
}

pub fn text_plain(self) -> Self {
self.set_mime(mime::TEXT_PLAIN)
}
}

/// Switch upload method
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum UploadMethod {
PUT,
POST,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct UploadInfo {
pub url: String,
pub auth: Option<Auth>,
pub content_type: ContentType,
pub method: UploadMethod,
}

impl From<&str> for UploadInfo {
Expand All @@ -60,7 +94,8 @@ impl UploadInfo {
Self {
url: url.into(),
auth: None,
content_type: ContentType::ApplicationOctetStream,
content_type: ContentType::Auto,
method: UploadMethod::PUT,
}
}

Expand All @@ -71,13 +106,17 @@ impl UploadInfo {
}
}

pub fn with_content_type(self, content_type: ContentType) -> Self {
pub fn set_content_type(self, content_type: ContentType) -> Self {
Self {
content_type,
..self
}
}

pub fn set_method(self, method: UploadMethod) -> Self {
Self { method, ..self }
}

pub fn url(&self) -> &str {
self.url.as_str()
}
Expand Down Expand Up @@ -166,18 +205,42 @@ impl Uploader {
info!("Redirecting request from {} to {target_url}", url.url())
}

// Todo: Ideally it detects the appropriate content-type automatically, e.g. UTF-8 => text/plain
let mut client = client
.put(target_url)
.header(CONTENT_TYPE, url.content_type.to_string())
.header(CONTENT_LENGTH, file_length);
let mut client = match url.method {
UploadMethod::PUT => client.put(target_url),
UploadMethod::POST => client.post(target_url),
};

if let Some(Auth::Bearer(token)) = &url.auth {
client = client.bearer_auth(token)
}

client = match url.content_type.clone() {
ContentType::Auto => {
let mime = MimeGuess::from_path(&self.source_filename).first_or_octet_stream();
client
.header(CONTENT_TYPE, mime.as_ref())
.header(CONTENT_LENGTH, file_length)
.body(file_body)
}
ContentType::Custom(mime) => client
.header(CONTENT_TYPE, mime.as_ref())
.header(CONTENT_LENGTH, file_length)
.body(file_body),
ContentType::FormData(data) => {
let mime = match data.mime {
None => MimeGuess::from_path(&self.source_filename).first_or_octet_stream(),
Some(mime) => mime,
};
let file_part = multipart::Part::stream_with_length(file_body, file_length)
.file_name(data.filename)
.mime_str(mime.as_ref())
.unwrap(); // safe, ensured that mime is valid
let form = multipart::Form::new().part("file", file_part);
client.multipart(form)
}
};

client
.body(file_body)
.send()
.await
.map_err(|err| {
Expand Down Expand Up @@ -255,6 +318,34 @@ mod tests {
assert!(uploader.upload(&url).await.is_ok())
}

#[tokio::test]
async fn upload_content_no_auth_post() {
let mut server = mockito::Server::new();
let _mock1 = server
.mock("POST", "/some_file.txt")
.with_status(201)
.create();

let mut target_url = server.url();
target_url.push_str("/some_file.txt");

let url = UploadInfo::new(&target_url)
.set_content_type(ContentType::FormData(FormData::new("filename".into())))
.set_method(UploadMethod::POST);

let ttd = TempTedgeDir::new();
ttd.file("file_upload.txt")
.with_raw_content("Hello, world!");

let mut uploader = Uploader::new(ttd.utf8_path().join("file_upload.txt"), None);
uploader.set_backoff(ExponentialBackoff {
current_interval: Duration::ZERO,
..Default::default()
});

assert!(uploader.upload(&url).await.is_ok())
}

#[tokio::test]
async fn upload_content_with_auth() {
let mut server = mockito::Server::new();
Expand Down
17 changes: 14 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use tedge_downloader_ext::DownloadResult;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tedge_uploader_ext::ContentType;
use tedge_uploader_ext::FormData;
use tedge_uploader_ext::UploadRequest;
use time::OffsetDateTime;
use tracing::log::warn;
Expand Down Expand Up @@ -183,6 +185,7 @@ impl CumulocityConverter {
fts_download: FtsDownloadOperationData,
) -> Result<Vec<MqttMessage>, ConversionError> {
let target = self.entity_store.try_get(&fts_download.entity_topic_id)?;
let xid = target.external_id.as_ref();
let smartrest_topic =
self.smartrest_publish_topic_for_entity(&fts_download.entity_topic_id)?;
let payload = fts_download.message.payload_str()?;
Expand Down Expand Up @@ -212,20 +215,28 @@ impl CumulocityConverter {
time: OffsetDateTime::now_utc(),
text: response.config_type.clone(),
extras: HashMap::new(),
device_id: target.external_id.as_ref().to_string(),
device_id: xid.to_string(),
};
let event_response_id = self.http_proxy.send_event(create_event).await?;

let binary_upload_event_url = self
.c8y_endpoint
.get_url_for_event_binary_upload_unchecked(&event_response_id);

let file_path = Utf8PathBuf::try_from(download.file_path).map_err(|e| e.into_io_error())?;

// The method must be POST, otherwise file name won't be supported.
let upload_request = UploadRequest::new(
self.auth_proxy
.proxy_url(binary_upload_event_url.clone())
.as_str(),
&Utf8PathBuf::try_from(download.file_path).map_err(|e| e.into_io_error())?,
);
&file_path,
)
.post()
.with_content_type(ContentType::FormData(FormData::new(format!(
"{xid}_{filename}",
filename = file_path.file_name().unwrap_or("filename")
))));

self.pending_upload_operations.insert(
cmd_id.clone(),
Expand Down
20 changes: 17 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tedge_uploader_ext::ContentType;
use tedge_uploader_ext::FormData;
use tedge_uploader_ext::UploadRequest;
use time::OffsetDateTime;
use tracing::debug;
Expand Down Expand Up @@ -187,6 +188,7 @@ impl CumulocityConverter {
let smartrest_topic = self.smartrest_publish_topic_for_entity(&topic_id)?;
let payload = fts_download.message.payload_str()?;
let response = &LogUploadCmdPayload::from_json(payload)?;
let xid = target.external_id.as_ref();

let download_response = match download_result {
Err(err) => {
Expand All @@ -212,21 +214,33 @@ impl CumulocityConverter {
time: OffsetDateTime::now_utc(),
text: response.log_type.clone(),
extras: HashMap::new(),
device_id: target.external_id.as_ref().to_string(),
device_id: xid.to_string(),
};
let event_response_id = self.http_proxy.send_event(create_event).await?;

let binary_upload_event_url = self
.c8y_endpoint
.get_url_for_event_binary_upload_unchecked(&event_response_id);

let file_path =
Utf8PathBuf::try_from(download_response.file_path).map_err(|e| e.into_io_error())?;

// The method must be POST, otherwise file name won't be supported.
// Mime must be text/*, otherwise c8y UI doesn't give a preview of the content.
let upload_request = UploadRequest::new(
self.auth_proxy
.proxy_url(binary_upload_event_url.clone())
.as_str(),
&Utf8PathBuf::try_from(download_response.file_path).map_err(|e| e.into_io_error())?,
&file_path,
)
.with_content_type(ContentType::TextPlain);
.post()
.with_content_type(ContentType::FormData(
FormData::new(format!(
"{xid}_{filename}",
filename = file_path.file_name().unwrap_or("filename")
))
.text_plain(),
));

self.uploader_sender
.send((cmd_id.clone(), upload_request))
Expand Down
Loading

0 comments on commit 404dda3

Please sign in to comment.