Skip to content

Commit

Permalink
fix( gcp_chronicle sink): add content encoding header when compressio…
Browse files Browse the repository at this point in the history
…n is enabled (#22009)

* fix(#22007): add content encoding header when compression is enabled

* docs(#22007): add change log file

* Update changelog.d/22007_chronicle_ingest_header.fix.md

Co-authored-by: Pavlos Rontidis <[email protected]>

* cargo fmt

* revert Cargo.lock

* clippy fixes

* clippy fixes

---------

Co-authored-by: Matt Searle <[email protected]>
Co-authored-by: Pavlos Rontidis <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent 7f10bf9 commit 26bd3d1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
3 changes: 3 additions & 0 deletions changelog.d/22007_chronicle_ingest_header.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `chronicle_unstructured` sink now sets the `content-encoding` header when compression is enabled.

authors: chocpanda
45 changes: 34 additions & 11 deletions src/sinks/gcp_chronicle/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use bytes::{Bytes, BytesMut};

use futures_util::{future::BoxFuture, task::Poll};
use goauth::scopes::Scope;
use http::{header::HeaderValue, Request, StatusCode, Uri};
use http::header::{self, HeaderName, HeaderValue};
use http::{Request, StatusCode, Uri};
use hyper::Body;
use indoc::indoc;
use serde::Serialize;
Expand Down Expand Up @@ -318,6 +319,7 @@ impl ChronicleUnstructuredConfig {
pub struct ChronicleRequest {
pub body: Bytes,
pub finalizers: EventFinalizers,
pub headers: HashMap<HeaderName, HeaderValue>,
metadata: RequestMetadata,
}

Expand Down Expand Up @@ -471,7 +473,33 @@ impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBui
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let mut headers = HashMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);

match payload.compressed_byte_size {
Some(compressed_byte_size) => {
headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
);
headers.insert(
header::CONTENT_ENCODING,
HeaderValue::from_str(self.compression.content_encoding().unwrap()).unwrap(),
);
}
None => {
headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
);
}
}

ChronicleRequest {
headers,
body: payload.into_payload().bytes,
finalizers,
metadata,
Expand Down Expand Up @@ -547,18 +575,13 @@ impl Service<ChronicleRequest> for ChronicleService {

fn call(&mut self, request: ChronicleRequest) -> Self::Future {
let mut builder = Request::post(&self.base_url);
let headers = builder.headers_mut().unwrap();
headers.insert(
"content-type",
HeaderValue::from_str("application/json").unwrap(),
);
headers.insert(
"content-length",
HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
);

let metadata = request.get_metadata().clone();

let headers = builder.headers_mut().unwrap();
for (name, value) in request.headers {
headers.insert(name, value);
}

let mut http_request = builder.body(Body::from(request.body)).unwrap();
self.creds.apply(&mut http_request);

Expand Down

0 comments on commit 26bd3d1

Please sign in to comment.