Skip to content

Commit eed3967

Browse files
committed
fix(ingestor): skip self when forwarding put stream request to querier
1 parent 771eec9 commit eed3967

File tree

3 files changed

+29
-6
lines changed

3 files changed

+29
-6
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub async fn sync_streams_with_ingestors(
6565
headers: HeaderMap,
6666
body: Bytes,
6767
stream_name: &str,
68+
skip_ingestor: Option<String>,
6869
) -> Result<(), StreamError> {
6970
let mut reqwest_headers = http_header::HeaderMap::new();
7071

@@ -77,7 +78,16 @@ pub async fn sync_streams_with_ingestors(
7778
})?;
7879

7980
let client = reqwest::Client::new();
80-
for ingestor in ingestor_infos.iter() {
81+
82+
let final_ingestor_infos = match skip_ingestor {
83+
None => ingestor_infos,
84+
Some(skip_ingestor) => ingestor_infos
85+
.into_iter()
86+
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
87+
.collect::<Vec<IngestorMetadata>>(),
88+
};
89+
90+
for ingestor in final_ingestor_infos {
8191
if !utils::check_liveness(&ingestor.domain_name).await {
8292
log::warn!("Ingestor {} is not live", ingestor.domain_name);
8393
continue;
@@ -858,10 +868,11 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre
858868
}
859869

860870
let url = format!(
861-
"{}{}/logstream/{}",
871+
"{}{}/logstream/{}?skip_ingestors={}",
862872
querier_endpoint,
863873
base_path_without_preceding_slash(),
864-
stream_name
874+
stream_name,
875+
CONFIG.parseable.ingestor_endpoint,
865876
);
866877

867878
let response = client

server/src/handlers/http/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
654654
header::CONTENT_TYPE,
655655
HeaderValue::from_static("application/json"),
656656
);
657-
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
657+
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?;
658658
}
659659
Ok(())
660660
}

server/src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use core::str;
12
use std::fs;
23

34
use actix_web::{web, HttpRequest, Responder};
45
use bytes::Bytes;
56
use chrono::Utc;
67
use http::StatusCode;
8+
use serde::Deserialize;
79
use tokio::sync::Mutex;
810

911
static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
@@ -77,12 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
7779
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
7880
}
7981

80-
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
82+
#[derive(Deserialize)]
83+
pub struct PutStreamQuery {
84+
skip_ingestors: Option<String>,
85+
}
86+
87+
pub async fn put_stream(
88+
req: HttpRequest,
89+
body: Bytes,
90+
info: web::Query<PutStreamQuery>,
91+
) -> Result<impl Responder, StreamError> {
8192
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
8293

8394
let _ = CREATE_STREAM_LOCK.lock().await;
8495
let headers = create_update_stream(&req, &body, &stream_name).await?;
85-
sync_streams_with_ingestors(headers, body, &stream_name).await?;
96+
97+
sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?;
8698

8799
Ok(("Log stream created", StatusCode::OK))
88100
}

0 commit comments

Comments
 (0)