Skip to content

Commit

Permalink
feat: reduce a clone of string (#3422)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Mar 3, 2024
1 parent 8609977 commit 0edf1bb
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/servers/src/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl TryFrom<InfluxdbRequest> for RowInsertRequests {

// tags
if let Some(tags) = tags {
let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.as_str()));
let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string()));
row_writer::write_tags(table_data, kvs, &mut one_row)?;
}

Expand Down
49 changes: 37 additions & 12 deletions src/servers/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe

let mut multi_table_data = MultiTableData::new();

for series in &request.timeseries {
for series in request.timeseries {
let table_name = &series
.labels
.iter()
Expand All @@ -326,29 +326,54 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe
series.samples.len(),
);

for Sample { value, timestamp } in &series.samples {
// labels
let kvs = series.labels.into_iter().filter_map(|label| {
if label.name == METRIC_NAME_LABEL {
None
} else {
Some((label.name, label.value))
}
});

if series.samples.len() == 1 {
let mut one_row = table_data.alloc_one_row();

// labels
let kvs = series.labels.iter().filter_map(|label| {
if label.name == METRIC_NAME_LABEL {
None
} else {
Some((label.name.to_string(), label.value.as_str()))
}
});
row_writer::write_tags(table_data, kvs, &mut one_row)?;
// value
row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
row_writer::write_f64(
table_data,
GREPTIME_VALUE,
series.samples[0].value,
&mut one_row,
)?;
// timestamp
row_writer::write_ts_millis(
table_data,
GREPTIME_TIMESTAMP,
Some(*timestamp),
Some(series.samples[0].timestamp),
&mut one_row,
)?;

table_data.add_row(one_row);
} else {
for Sample { value, timestamp } in &series.samples {
let mut one_row = table_data.alloc_one_row();

// labels
let kvs = kvs.clone();
row_writer::write_tags(table_data, kvs, &mut one_row)?;
// value
row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
table_data,
GREPTIME_TIMESTAMP,
Some(*timestamp),
&mut one_row,
)?;

table_data.add_row(one_row);
}
}
}

Expand Down
50 changes: 25 additions & 25 deletions src/servers/src/row_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,10 @@ impl MultiTableData {

pub fn write_tags(
table_data: &mut TableData,
kvs: impl Iterator<Item = (String, impl ToString)>,
kvs: impl Iterator<Item = (String, String)>,
one_row: &mut Vec<Value>,
) -> Result<()> {
let ktv_iter = kvs.map(|(k, v)| {
(
k,
ColumnDataType::String,
ValueData::StringValue(v.to_string()),
)
});
let ktv_iter = kvs.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v)));
write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
}

Expand Down Expand Up @@ -209,18 +203,22 @@ fn write_by_semantic_type(
} = table_data;

for (name, datatype, value) in ktv_iter {
let index = column_indexes.entry(name.clone()).or_insert(schema.len());
if *index == schema.len() {
let index = column_indexes.get(&name);
if let Some(index) = index {
check_schema(datatype, semantic_type, &schema[*index])?;
one_row[*index].value_data = Some(value);
} else {
let index = schema.len();
schema.push(ColumnSchema {
column_name: name,
column_name: name.clone(),
datatype: datatype as i32,
semantic_type: semantic_type as i32,
..Default::default()
});
one_row.push(value.into());
} else {
check_schema(datatype, semantic_type, &schema[*index])?;
one_row[*index].value_data = Some(value);
column_indexes.insert(name, index);
one_row.push(Value {
value_data: Some(value),
});
}
}

Expand Down Expand Up @@ -264,22 +262,24 @@ pub fn write_ts_precision(
}
};

let index = column_indexes.entry(name.clone()).or_insert(schema.len());
if *index == schema.len() {
schema.push(ColumnSchema {
column_name: name,
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
});
one_row.push(ValueData::TimestampMillisecondValue(ts).into())
} else {
let index = column_indexes.get(&name);
if let Some(index) = index {
check_schema(
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
&schema[*index],
)?;
one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts));
} else {
let index = schema.len();
schema.push(ColumnSchema {
column_name: name.clone(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
});
column_indexes.insert(name, index);
one_row.push(ValueData::TimestampMillisecondValue(ts).into())
}

Ok(())
Expand Down

0 comments on commit 0edf1bb

Please sign in to comment.