Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add strict mode to validate protocol strings #3638

Merged
merged 14 commits into from
Apr 15, 2024
6 changes: 5 additions & 1 deletion src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ where

if opts.prom_store.enable {
builder = builder
.with_prom_handler(self.instance.clone(), opts.prom_store.with_metric_engine)
.with_prom_handler(
self.instance.clone(),
opts.prom_store.with_metric_engine,
opts.http.is_strict_mode,
)
.with_prometheus_handler(self.instance.clone());
}

Expand Down
40 changes: 37 additions & 3 deletions src/servers/benches/prom_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use prost::Message;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::proto::PromWriteRequest;

fn bench_decode_prom_request(c: &mut Criterion) {
fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");
Expand All @@ -30,6 +30,7 @@ fn bench_decode_prom_request(c: &mut Criterion) {

let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
let is_strict_mode = false;
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
Expand All @@ -43,11 +44,44 @@ fn bench_decode_prom_request(c: &mut Criterion) {
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data).unwrap();
prom_request.merge(data, is_strict_mode).unwrap();
prom_request.as_row_insert_requests();
});
});
}

criterion_group!(benches, bench_decode_prom_request);
fn bench_decode_prom_request_with_strict_mode(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");

let data = Bytes::from(std::fs::read(d).unwrap());

let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
let is_strict_mode = true;
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
b.iter(|| {
request.clear();
let data = data.clone();
request.merge(data).unwrap();
to_grpc_row_insert_requests(&request).unwrap();
});
})
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data, is_strict_mode).unwrap();
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
prom_request.as_row_insert_requests();
});
});
}

criterion_group!(
benches,
bench_decode_prom_request_without_strict_mode,
bench_decode_prom_request_with_strict_mode
);
criterion_main!(benches);
36 changes: 28 additions & 8 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct HttpOptions {
pub disable_dashboard: bool,

pub body_limit: ReadableSize,

pub is_strict_mode: bool,
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for HttpOptions {
Expand All @@ -139,6 +141,7 @@ impl Default for HttpOptions {
timeout: Duration::from_secs(30),
disable_dashboard: false,
body_limit: DEFAULT_BODY_LIMIT,
is_strict_mode: false,
}
}
}
Expand Down Expand Up @@ -505,11 +508,12 @@ impl HttpServerBuilder {
self,
handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"),
HttpServer::route_prom(handler, prom_store_with_metric_engine),
HttpServer::route_prom(handler, prom_store_with_metric_engine, is_strict_mode),
),
..self
}
Expand Down Expand Up @@ -701,15 +705,31 @@ impl HttpServer {
fn route_prom<S>(
prom_handler: PromStoreProtocolHandlerRef,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Router<S> {
let mut router = Router::new().route("/read", routing::post(prom_store::remote_read));
if prom_store_with_metric_engine {
router = router.route("/write", routing::post(prom_store::remote_write));
} else {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine),
);
match (prom_store_with_metric_engine, is_strict_mode) {
(true, true) => {
router = router.route("/write", routing::post(prom_store::remote_write))
}
(true, false) => {
router = router.route(
"/write",
routing::post(prom_store::remote_write_without_strict_mode),
)
}
(false, true) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine),
)
}
(false, false) => {
router = router.route(
"/write",
routing::post(prom_store::route_write_without_metric_engine_and_strict_mode),
)
}
}
router.with_state(prom_handler)
}
Expand Down
83 changes: 79 additions & 4 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,38 @@ pub async fn route_write_without_metric_engine(
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body).await?;
let (request, samples) = decode_remote_write_request(is_zstd, body, true).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
)
.into_response())
}

/// Same with [remote_write] but won't store data to metric engine.
/// And without strict_mode on will not check invalid UTF-8.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine_and_strict_mode(
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, false).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
Expand Down Expand Up @@ -127,7 +158,8 @@ pub async fn remote_write(
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request_to_row_inserts(is_zstd, body).await?;
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, true).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
Expand All @@ -144,6 +176,47 @@ pub async fn remote_write(
.into_response())
}

#[axum_macros::debug_handler]
#[tracing::instrument(
skip_all,
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write_without_strict_mode(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, false).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
)
.into_response())
}

impl IntoResponse for PromStoreResponse {
fn into_response(self) -> axum::response::Response {
let mut header_map = HeaderMap::new();
Expand Down Expand Up @@ -187,6 +260,7 @@ pub async fn remote_read(
async fn decode_remote_write_request_to_row_inserts(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
Expand All @@ -201,14 +275,15 @@ async fn decode_remote_write_request_to_row_inserts(

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf)
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}

async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
Expand All @@ -223,7 +298,7 @@ async fn decode_remote_write_request(

let mut request = PromWriteRequest::default();
request
.merge(buf)
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}
Expand Down
Loading