Skip to content

Commit

Permalink
chore: make clippy happy
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Aug 1, 2024
1 parent dc83442 commit 5cf7a8e
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 50 deletions.
16 changes: 10 additions & 6 deletions src/sinks/greptimedb/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
http_healthcheck, GreptimeDBHttpRetryLogic, GreptimeDBLogsHttpRequestBuilder,
PartitionKey,
},
sink::GreptimeDBLogsHttpSink,
sink::{GreptimeDBLogsHttpSink, LogsSinkSetting},
},
GreptimeDBDefaultBatchSettings,
},
Expand Down Expand Up @@ -157,15 +157,19 @@ impl SinkConfig for GreptimeDBLogsConfig {
.unwrap_or("http")
.to_string();

let logs_sink_setting = LogsSinkSetting {
dbname: self.dbname.clone(),
table: self.table.clone(),
pipeline_name: self.pipeline_name.clone(),
pipeline_version: self.pipeline_version.clone(),
protocol,
};

let sink = GreptimeDBLogsHttpSink::new(
self.batch.into_batcher_settings()?,
service,
self.dbname.clone(),
self.table.clone(),
self.pipeline_name.clone(),
self.pipeline_version.clone(),
request_builder,
protocol,
logs_sink_setting,
);

let healthcheck = Box::pin(http_healthcheck(
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/greptimedb/logs/http_request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ fn prepare_log_ingester_url(
let mut url = url::Url::parse(&path).unwrap();
let mut url_builder = url.query_pairs_mut();
url_builder
.append_pair("db", &db)
.append_pair("table", &table)
.append_pair("db", db)
.append_pair("table", table)
.append_pair("pipeline_name", &metadata.pipeline_name);

if let Some(pipeline_version) = metadata.pipeline_version.as_ref() {
Expand All @@ -135,7 +135,7 @@ impl HttpServiceRequestBuilder<PartitionKey> for GreptimeDBLogsHttpRequestBuilde
self.endpoint.as_str(),
&db,
&table,
&metadata,
metadata,
&self.extra_params,
);

Expand Down Expand Up @@ -271,7 +271,7 @@ mod tests {
.collect(),
);

let url = prepare_log_ingester_url(&endpoint, db, table, &metadata, &extra_params);
let url = prepare_log_ingester_url(endpoint, db, table, &metadata, &extra_params);
let url = url::Url::parse(&url).unwrap();
let check = url.query_pairs().all(|(k, v)| match k.as_ref() {
"db" => v == "test_db",
Expand Down
36 changes: 16 additions & 20 deletions src/sinks/greptimedb/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ use crate::sinks::{
util::http::HttpRequest,
};

pub struct LogsSinkSetting {
pub dbname: Template,
pub table: Template,
pub pipeline_name: Template,
pub pipeline_version: Option<Template>,
pub protocol: String,
}

/// A sink that ingests logs into GreptimeDB.
pub struct GreptimeDBLogsHttpSink<S> {
batcher_settings: BatcherSettings,
service: S,
dbname: Template,
table: Template,
pipeline_name: Template,
pipeline_version: Option<Template>,
request_builder: GreptimeDBLogsHttpRequestBuilder,
protocol: String,
logs_sink_setting: LogsSinkSetting,
}

impl<S> GreptimeDBLogsHttpSink<S>
Expand All @@ -28,22 +32,14 @@ where
pub const fn new(
batcher_settings: BatcherSettings,
service: S,
dbname: Template,
table: Template,
pipeline_name: Template,
pipeline_version: Option<Template>,
request_builder: GreptimeDBLogsHttpRequestBuilder,
protocol: String,
logs_sink_setting: LogsSinkSetting,
) -> Self {
Self {
batcher_settings,
service,
dbname,
table,
pipeline_name,
pipeline_version,
request_builder,
protocol,
logs_sink_setting,
}
}

Expand All @@ -52,10 +48,10 @@ where
input
.batched_partitioned(
KeyPartitioner::new(
self.dbname,
self.table,
self.pipeline_name,
self.pipeline_version,
self.logs_sink_setting.dbname,
self.logs_sink_setting.table,
self.logs_sink_setting.pipeline_name,
self.logs_sink_setting.pipeline_version,
),
|| batcher_settings.as_byte_size_config(),
)
Expand All @@ -74,7 +70,7 @@ where
}
})
.into_driver(self.service)
.protocol(self.protocol)
.protocol(self.logs_sink_setting.protocol)
.run()
.await
}
Expand Down
21 changes: 12 additions & 9 deletions src/sinks/greptimedb/metrics/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use super::request::GreptimeDBGrpcRetryLogic;
use super::{
service::{healthcheck, GreptimeDBGrpcService},
sink,
};
use crate::sinks::{
greptimedb::{default_dbname, GreptimeDBDefaultBatchSettings},
greptimedb::{
default_dbname,
metrics::{
request::GreptimeDBGrpcRetryLogic,
service::{healthcheck, GreptimeDBGrpcService},
sink,
},
GreptimeDBDefaultBatchSettings,
},
prelude::*,
};
use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};

/// Configuration for the `logdna` sink.
/// Configuration for the `greptimedb` sink.
#[configurable_component(sink("greptimedb", "Ingest metrics data into GreptimeDB."))]
#[configurable(metadata(
deprecated = "The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead."
Expand All @@ -24,7 +27,7 @@ impl GenerateConfig for GreptimeDBConfig {
}

#[async_trait::async_trait]
#[typetag::serde(name = "logdna")]
#[typetag::serde(name = "greptimedb")]
impl SinkConfig for GreptimeDBConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
warn!("DEPRECATED: The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead.");
Expand Down Expand Up @@ -110,7 +113,7 @@ pub struct GreptimeDBMetricsConfig {

impl_generate_config_from_default!(GreptimeDBMetricsConfig);

#[typetag::serde(name = "greptimedb")]
#[typetag::serde(name = "greptimedb_metrics")]
#[async_trait::async_trait]
impl SinkConfig for GreptimeDBMetricsConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/greptimedb/metrics/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{batch::GreptimeDBBatchSizer, request_builder::metric_to_insert_request};
use crate::sinks::prelude::*;
use crate::sinks::{
greptimedb::metrics::{batch::GreptimeDBBatchSizer, request_builder::metric_to_insert_request},
prelude::*,
};
use greptimedb_ingester::{api::v1::*, Error as GreptimeError};
use std::num::NonZeroUsize;
use vector_lib::event::Metric;
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/greptimedb/metrics/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{
config::GreptimeDBMetricsConfig,
request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
use crate::sinks::{
greptimedb::metrics::config::GreptimeDBMetricsConfig,
greptimedb::metrics::request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
prelude::*,
};
use crate::sinks::prelude::*;
use greptimedb_ingester::{
api::v1::auth_header::AuthScheme, api::v1::*, channel_manager::*, Client, ClientBuilder,
Compression, Database, Error as GreptimeError,
Expand Down
8 changes: 4 additions & 4 deletions src/sinks/greptimedb/metrics/sink.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{
batch::GreptimeDBBatchSizer, request::GreptimeDBGrpcRetryLogic, service::GreptimeDBGrpcService,
};
use crate::sinks::{
greptimedb::metrics::request::GreptimeDBGrpcRequest,
greptimedb::metrics::{
batch::GreptimeDBBatchSizer, request::GreptimeDBGrpcRequest,
request::GreptimeDBGrpcRetryLogic, service::GreptimeDBGrpcService,
},
prelude::*,
util::buffer::metrics::{MetricNormalize, MetricSet},
};
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub mod gcp;
pub mod gcp_chronicle;
#[cfg(any(feature = "sinks-gcp-chronicle", feature = "sinks-gcp"))]
pub mod gcs_common;
#[cfg(feature = "sinks-greptimedb")]
#[cfg(any(feature = "sinks-greptimedb_metrics", feature = "sink-greptimedb_logs"))]
pub mod greptimedb;
#[cfg(feature = "sinks-honeycomb")]
pub mod honeycomb;
Expand Down

0 comments on commit 5cf7a8e

Please sign in to comment.