Skip to content

Commit

Permalink
feat: support decode gzip if influxdb write specify it (#3494)
Browse files Browse the repository at this point in the history
* feat: support dedoce gzip if influxdb write specify it

Signed-off-by: tison <[email protected]>

* address comments

Signed-off-by: tison <[email protected]>

* simplify with tower_http DecompressionLayer

Signed-off-by: tison <[email protected]>

* tidy some code

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Mar 14, 2024
1 parent b85d7bb commit 61f0703
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 39 deletions.
19 changes: 10 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ tokio-stream = { workspace = true, features = ["net"] }
toml.workspace = true
tonic.workspace = true
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
url = "2.3.1"
uuid.workspace = true

Expand Down
2 changes: 1 addition & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ tokio-stream = { workspace = true, features = ["net"] }
tonic.workspace = true
tonic-reflection = "0.10"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
urlencoding = "2.1"

[target.'cfg(not(windows))'.dependencies]
Expand Down
24 changes: 16 additions & 8 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,25 @@ pub enum Error {
#[snafu(display("Not found influx http authorization info"))]
NotFoundInfluxAuth {},

#[snafu(display("Unsupported http auth scheme, name: {}", name))]
UnsupportedAuthScheme { name: String },

#[snafu(display("Invalid visibility ASCII chars"))]
InvisibleASCII {
InvalidAuthHeaderInvisibleASCII {
#[snafu(source)]
error: hyper::header::ToStrError,
location: Location,
},

#[snafu(display("Unsupported http auth scheme, name: {}", name))]
UnsupportedAuthScheme { name: String },
#[snafu(display("Invalid utf-8 value"))]
InvalidAuthHeaderInvalidUtf8Value {
#[snafu(source)]
error: FromUtf8Error,
location: Location,
},

#[snafu(display("Invalid http authorization header"))]
InvalidAuthorizationHeader { location: Location },
InvalidAuthHeader { location: Location },

#[snafu(display("Invalid base64 value"))]
InvalidBase64Value {
Expand Down Expand Up @@ -520,16 +527,17 @@ impl ErrorExt for Error {
DescribeStatement { source } => source.status_code(),

NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound,
InvisibleASCII { .. }
InvalidAuthHeaderInvisibleASCII { .. }
| UnsupportedAuthScheme { .. }
| InvalidAuthorizationHeader { .. }
| InvalidAuthHeader { .. }
| InvalidBase64Value { .. }
| InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
| InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,

DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,
#[cfg(feature = "mem-prof")]
DumpProfileData { source, .. } => source.status_code(),
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,

InvalidUtf8Value { .. } | InvalidFlushArgument { .. } => StatusCode::InvalidArguments,

ReplacePreparedStmtParams { source, .. }
| GetPreparedStmtParams { source, .. }
Expand Down
6 changes: 6 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tower::timeout::TimeoutLayer;
use tower::ServiceBuilder;
use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;

use self::authorize::AuthState;
Expand Down Expand Up @@ -698,6 +699,11 @@ impl HttpServer {
Router::new()
.route("/write", routing::post(influxdb_write_v1))
.route("/api/v2/write", routing::post(influxdb_write_v2))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.route("/ping", routing::get(influxdb_ping))
.route("/health", routing::get(influxdb_health))
.with_state(influxdb_handler)
Expand Down
33 changes: 13 additions & 20 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
use super::{ResponseFormat, PUBLIC_APIS};
use crate::error::{
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
};
use crate::http::error_result::ErrorResponse;
Expand Down Expand Up @@ -174,15 +174,13 @@ fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username,
// try header
let (auth_scheme, credential) = header
.to_str()
.context(InvisibleASCIISnafu)?
.context(InvalidAuthHeaderInvisibleASCIISnafu)?
.split_once(' ')
.context(InvalidAuthorizationHeaderSnafu)?;
.context(InvalidAuthHeaderSnafu)?;

let (username, password) = match auth_scheme.to_lowercase().as_str() {
"token" => {
let (u, p) = credential
.split_once(':')
.context(InvalidAuthorizationHeaderSnafu)?;
let (u, p) = credential.split_once(':').context(InvalidAuthHeaderSnafu)?;
(u.to_string(), p.to_string().into())
}
"basic" => decode_basic(credential)?,
Expand Down Expand Up @@ -237,13 +235,10 @@ impl TryFrom<&str> for AuthScheme {
type Error = error::Error;

fn try_from(value: &str) -> Result<Self> {
let (scheme, encoded_credentials) = value
.split_once(' ')
.context(InvalidAuthorizationHeaderSnafu)?;
ensure!(
!encoded_credentials.contains(' '),
InvalidAuthorizationHeaderSnafu
);
let (scheme, encoded_credentials) =
value.split_once(' ').context(InvalidAuthHeaderSnafu)?;

ensure!(!encoded_credentials.contains(' '), InvalidAuthHeaderSnafu);

match scheme.to_lowercase().as_str() {
"basic" => decode_basic(encoded_credentials)
Expand All @@ -261,7 +256,7 @@ fn auth_header<B>(req: &Request<B>) -> Result<AuthScheme> {
.get(http::header::AUTHORIZATION)
.context(error::NotFoundAuthHeaderSnafu)?
.to_str()
.context(InvisibleASCIISnafu)?;
.context(InvalidAuthHeaderInvisibleASCIISnafu)?;

auth_header.try_into()
}
Expand All @@ -270,13 +265,14 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> {
let decoded = BASE64_STANDARD
.decode(credential)
.context(error::InvalidBase64ValueSnafu)?;
let as_utf8 = String::from_utf8(decoded).context(error::InvalidUtf8ValueSnafu)?;
let as_utf8 =
String::from_utf8(decoded).context(error::InvalidAuthHeaderInvalidUtf8ValueSnafu)?;

if let Some((user_id, password)) = as_utf8.split_once(':') {
return Ok((user_id.to_string(), password.to_string().into()));
}

InvalidAuthorizationHeaderSnafu {}.fail()
InvalidAuthHeaderSnafu {}.fail()
}

fn need_auth<B>(req: &Request<B>) -> bool {
Expand Down Expand Up @@ -395,10 +391,7 @@ mod tests {

let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6 cGFzc3dvcmQ="), None).unwrap();
let res = auth_header(&wrong_req);
assert_matches!(
res.err(),
Some(error::Error::InvalidAuthorizationHeader { .. })
);
assert_matches!(res.err(), Some(error::Error::InvalidAuthHeader { .. }));

let wrong_req = mock_http_request(Some("Digest dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
let res = auth_header(&wrong_req);
Expand Down

0 comments on commit 61f0703

Please sign in to comment.