Skip to content

feat: allow stream creation from ingestor in distributed deployments #980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::get_staging_metadata;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
Expand Down Expand Up @@ -64,6 +65,7 @@ pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
body: Bytes,
stream_name: &str,
skip_ingestor: Option<String>,
) -> Result<(), StreamError> {
let mut reqwest_headers = http_header::HeaderMap::new();

Expand All @@ -76,7 +78,16 @@ pub async fn sync_streams_with_ingestors(
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {

let final_ingestor_infos = match skip_ingestor {
None => ingestor_infos,
Some(skip_ingestor) => ingestor_infos
.into_iter()
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
.collect::<Vec<IngestorMetadata>>(),
};

for ingestor in final_ingestor_infos {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
Expand Down Expand Up @@ -841,3 +852,62 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {

Ok(())
}

pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
let client = reqwest::Client::new();

let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
})?;
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
let token = staging_metadata.querier_auth_token.unwrap();

if !check_liveness(&querier_endpoint).await {
log::warn!("Querier {} is not live", querier_endpoint);
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
}

let url = format!(
"{}{}/logstream/{}?skip_ingestors={}",
querier_endpoint,
base_path_without_preceding_slash(),
stream_name,
CONFIG.parseable.ingestor_endpoint,
);

let response = client
.put(&url)
.header(header::AUTHORIZATION, &token)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
&url,
err
);
StreamError::Network(err)
})?;

let status = response.status();

if !status.is_success() {
let response_text = response.text().await.map_err(|err| {
log::error!("Failed to read response text from querier: {}", &url);
StreamError::Network(err)
})?;

log::error!(
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
&url,
response_text
);

return Err(StreamError::Anyhow(anyhow::anyhow!(
"Request failed with status: {}",
status,
)));
}

Ok(())
}
16 changes: 11 additions & 5 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::event::{
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::http::cluster::forward_create_stream_request;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
Expand Down Expand Up @@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists(
if !streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
log::error!("Stream {} not found", stream_name);
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream `{}` not found. Please create it using the Query server.",
stream_name
)));
match forward_create_stream_request(stream_name).await {
Ok(()) => log::info!("Stream {} created", stream_name),
Err(e) => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Unable to create stream: {} using query server. Error: {}",
stream_name,
e.to_string(),
)))
}
};
}
metadata::STREAM_INFO
.upsert_stream_info(
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?;
}
Ok(())
}
Expand Down
20 changes: 18 additions & 2 deletions server/src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use core::str;
use std::fs;

use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use serde::Deserialize;
use tokio::sync::Mutex;

static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());

use crate::{
event,
Expand Down Expand Up @@ -74,11 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
#[derive(Deserialize)]
pub struct PutStreamQuery {
skip_ingestors: Option<String>,
}

pub async fn put_stream(
req: HttpRequest,
body: Bytes,
info: web::Query<PutStreamQuery>,
) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

let _ = CREATE_STREAM_LOCK.lock().await;
let headers = create_update_stream(&req, &body, &stream_name).await?;
sync_streams_with_ingestors(headers, body, &stream_name).await?;

sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?;

Ok(("Log stream created", StatusCode::OK))
}
Expand Down
4 changes: 4 additions & 0 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub async fn run_metadata_migration(
let metadata = metadata_migration::v3_v4(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
Some("v4") => {
let metadata = metadata_migration::v4_v5(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
_ => (),
}
}
Expand Down
50 changes: 50 additions & 0 deletions server/src/migration/metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use base64::Engine;
use rand::distributions::DistString;
use serde_json::{Map, Value as JsonValue};

Expand Down Expand Up @@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
storage_metadata
}

// maybe rename
pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
metadata.remove_entry("version");
metadata.insert("version".to_string(), JsonValue::String("v5".to_string()));

match metadata.get("server_mode") {
None => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
}
Some(JsonValue::String(mode)) => match mode.as_str() {
"Query" => {
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
);
}
"All" => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
);
}
_ => (),
},
_ => (),
}

metadata.insert(
"querier_auth_token".to_string(),
JsonValue::String(format!(
"Basic {}",
base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
))
)),
);

storage_metadata
}

pub async fn migrate_ingester_metadata() -> anyhow::Result<Option<IngestorMetadata>> {
let imp = ingestor_metadata_path(None);
let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await {
Expand Down
3 changes: 2 additions & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;
pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata,
StorageMetadata,
};

// metadata file names in a Stream prefix
Expand Down
23 changes: 22 additions & 1 deletion server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
path::PathBuf,
};

use base64::Engine;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use relative_path::RelativePathBuf;
Expand Down Expand Up @@ -63,10 +64,29 @@ pub struct StorageMetadata {
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
#[serde(default)]
pub default_role: Option<String>,
pub querier_endpoint: Option<String>,
pub querier_auth_token: Option<String>,
}

impl StorageMetadata {
pub fn new() -> Self {
let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode {
Mode::All | Mode::Query => {
let querier_auth_token = format!(
"Basic {}",
base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
))
);
(
Some(CONFIG.parseable.address.clone()),
Some(querier_auth_token),
)
}
Mode::Ingest => (None, None),
};

Self {
version: CURRENT_STORAGE_METADATA_VERSION.to_string(),
mode: CONFIG.storage_name.to_owned(),
Expand All @@ -78,9 +98,10 @@ impl StorageMetadata {
streams: Vec::new(),
roles: HashMap::default(),
default_role: None,
querier_endpoint,
querier_auth_token,
}
}

pub fn global() -> &'static StaticStorageMetadata {
STORAGE_METADATA
.get()
Expand Down
Loading