Skip to content

Commit

Permalink
[improve] update victoriametrics and greptime store (#2836)
Browse files Browse the repository at this point in the history
Signed-off-by: tomsun28 <[email protected]>
Signed-off-by: Logic <[email protected]>
Co-authored-by: shown <[email protected]>
Co-authored-by: Logic <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Zhang Yuxuan <[email protected]>
  • Loading branch information
5 people authored Nov 27, 2024
1 parent 509a9c9 commit fc8be06
Show file tree
Hide file tree
Showing 19 changed files with 396 additions and 530 deletions.
6 changes: 3 additions & 3 deletions hertzbeat-manager/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ warehouse:
greptime:
enabled: false
grpc-endpoints: localhost:4001
url: jdbc:mysql://localhost:4002/hertzbeat?connectionTimeZone=Asia/Shanghai&forceConnectionTimeZoneToSession=true
driver-class-name: com.mysql.cj.jdbc.Driver
http-endpoint: http://localhost:4000
# if you config other database name, you should create them first
database: public
username: greptime
password: greptime
expire-time: 30d
iot-db:
enabled: false
host: 127.0.0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ metrics:
# field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field
- field: version
type: 1
label: true
i18n:
zh-CN: 版本
en-US: Version
Expand Down
1 change: 0 additions & 1 deletion hertzbeat-manager/src/main/resources/define/app-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ metrics:
# field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field
- field: version
type: 1
label: true
i18n:
zh-CN: 版本
en-US: Version
Expand Down
6 changes: 0 additions & 6 deletions hertzbeat-warehouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@
</exclusion>
</exclusions>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-jdbcdriver.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@
/**
* GrepTimeDB configuration information
*/

@ConfigurationProperties(prefix = ConfigConstants.FunctionModuleConstants.WAREHOUSE
+ SignConstants.DOT
+ WarehouseConstants.STORE
+ SignConstants.DOT
+ WarehouseConstants.HistoryName.GREPTIME)
public record GreptimeProperties(@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1:4001") String grpcEndpoints,
@DefaultValue("jdbc:mysql://127.0.0.1:4002/hertzbeat?connectionTimeZone=Asia/Shanghai&forceConnectionTimeZoneToSession=true") String url,
@DefaultValue("com.mysql.cj.jdbc.Driver") String driverClassName, String username, String password,
// Database TTL, default is 30 days.
@DefaultValue("30d") String expireTime) {
@DefaultValue("127.0.0.1:4001") String grpcEndpoints, @DefaultValue("http://127.0.0.1:4000") String httpEndpoint,
@DefaultValue("public") String database, String username, String password) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hertzbeat.warehouse.store.history.vm;

import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* promql query content
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class PromQlQueryContent {

private String status;

private ContentData data;

/**
* content data
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static final class ContentData {

private String resultType;

private List<Content> result;

/**
* content
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static final class Content {

/**
* metric contains metric name plus labels for a particular time series
*/
private Map<String, String> metric;

/**
* values contains raw sample values for the given time series
* value-timestamp
* [1700993195,"436960986"]
*/
private Object[] value;

/**
* values contains raw sample values for the given time series
* value-timestamp list
* [[1700993195,"436960986"],[1700993195,"436960986"]...]
*/
private List<Object[]> values;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public void saveData(CollectRep.MetricsData metricsData) {
return;
}
if (metricsData.getValuesList().isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} is null, ignore.", metricsData.getId());
log.info("[warehouse victoria-metrics] flush metrics data {} {} {} is null, ignore.",
metricsData.getId(), metricsData.getApp(), metricsData.getMetrics());
return;
}
Map<String, String> defaultLabels = new HashMap<>(8);
Expand All @@ -134,8 +135,8 @@ public void saveData(CollectRep.MetricsData metricsData) {
if (metricsData.getApp().startsWith(CommonConstants.PROMETHEUS_APP_PREFIX)) {
isPrometheusAuto = true;
defaultLabels.remove(MONITOR_METRICS_KEY);
defaultLabels.put(LABEL_KEY_JOB,
metricsData.getApp().substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp()
.substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
} else {
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp());
}
Expand All @@ -145,6 +146,7 @@ public void saveData(CollectRep.MetricsData metricsData) {
Long[] timestamp = new Long[]{metricsData.getTime()};
Map<String, Double> fieldsValue = new HashMap<>(fields.size());
Map<String, String> labels = new HashMap<>(fields.size());
List<VictoriaMetricsDataStorage.VictoriaMetricsContent> contentList = new LinkedList<>();
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
fieldsValue.clear();
labels.clear();
Expand Down Expand Up @@ -172,34 +174,47 @@ public void saveData(CollectRep.MetricsData metricsData) {
if (!isPrometheusAuto) {
labels.put(MONITOR_METRIC_KEY, entry.getKey());
}
VictoriaMetricsContent content = VictoriaMetricsContent.builder().metric(labels)
.values(new Double[]{entry.getValue()}).timestamps(timestamp).build();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (StringUtils.hasText(vmInsertProps.username()) && StringUtils.hasText(
vmInsertProps.password())) {
String authStr = vmInsertProps.username() + ":" + vmInsertProps.password();
String encodedAuth = new String(
Base64.encodeBase64(authStr.getBytes(StandardCharsets.UTF_8)),
StandardCharsets.UTF_8);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
}
HttpEntity<VictoriaMetricsContent> httpEntity = new HttpEntity<>(content, headers);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(
vmInsertProps.url() + IMPORT_PATH, httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("insert metrics data to victoria-metrics success. {}", content);
} else {
log.error("insert metrics data to victoria-metrics failed. {}", content);
}
VictoriaMetricsDataStorage.VictoriaMetricsContent content = VictoriaMetricsDataStorage.VictoriaMetricsContent.builder()
.metric(new HashMap<>(labels))
.values(new Double[]{entry.getValue()})
.timestamps(timestamp)
.build();
contentList.add(content);
} catch (Exception e) {
log.error("flush metrics data to victoria-metrics error: {}.", e.getMessage(), e);
log.error("combine metrics data error: {}.", e.getMessage(), e);
}

}
}
}
if (contentList.isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} is empty, ignore.", metricsData.getId());
return;
}
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (StringUtils.hasText(vmInsertProps.username())
&& StringUtils.hasText(vmInsertProps.password())) {
String authStr = vmInsertProps.username() + ":" + vmInsertProps.password();
String encodedAuth = new String(Base64.encodeBase64(authStr.getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC + SignConstants.BLANK + encodedAuth);
}
StringBuilder stringBuilder = new StringBuilder();
for (VictoriaMetricsDataStorage.VictoriaMetricsContent content : contentList) {
stringBuilder.append(JsonUtil.toJson(content)).append("\n");
}
HttpEntity<String> httpEntity = new HttpEntity<>(stringBuilder.toString(), headers);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(vmInsertProps.url() + IMPORT_PATH,
httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("insert metrics data to victoria-metrics success.");
} else {
log.error("insert metrics data to victoria-metrics failed. {}", responseEntity.getBody());
}
} catch (Exception e){
log.error("flush metrics data to victoria-metrics error: {}.", e.getMessage(), e);
}
}

@Override
Expand Down Expand Up @@ -330,15 +345,15 @@ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, Str
URLEncoder.encode("{" + timeSeriesSelector + "}", StandardCharsets.UTF_8))
.queryParam("step", "4h").queryParam("start", startTime).queryParam("end", endTime).build(true)
.toUri();
ResponseEntity<VictoriaMetricsQueryContent> responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, VictoriaMetricsQueryContent.class);
ResponseEntity<PromQlQueryContent> responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from victoria-metrics success. {}", uri);
if (responseEntity.getBody() != null && responseEntity.getBody().getData() != null
&& responseEntity.getBody().getData().getResult() != null) {
List<VictoriaMetricsQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
List<PromQlQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
.getResult();
for (VictoriaMetricsQueryContent.ContentData.Content content : contents) {
for (PromQlQueryContent.ContentData.Content content : contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
labels.remove(LABEL_KEY_JOB);
Expand Down Expand Up @@ -368,13 +383,13 @@ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, Str
URLEncoder.encode("max_over_time({" + timeSeriesSelector + "})", StandardCharsets.UTF_8))
.queryParam("step", "4h").queryParam("start", startTime).queryParam("end", endTime).build(true)
.toUri();
responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, VictoriaMetricsQueryContent.class);
responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null && responseEntity.getBody().getData() != null
&& responseEntity.getBody().getData().getResult() != null) {
List<VictoriaMetricsQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
List<PromQlQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
.getResult();
for (VictoriaMetricsQueryContent.ContentData.Content content : contents) {
for (PromQlQueryContent.ContentData.Content content : contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
labels.remove(LABEL_KEY_JOB);
Expand Down Expand Up @@ -404,13 +419,13 @@ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, Str
URLEncoder.encode("min_over_time({" + timeSeriesSelector + "})", StandardCharsets.UTF_8))
.queryParam("step", "4h").queryParam("start", startTime).queryParam("end", endTime).build(true)
.toUri();
responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, VictoriaMetricsQueryContent.class);
responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null && responseEntity.getBody().getData() != null
&& responseEntity.getBody().getData().getResult() != null) {
List<VictoriaMetricsQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
List<PromQlQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
.getResult();
for (VictoriaMetricsQueryContent.ContentData.Content content : contents) {
for (PromQlQueryContent.ContentData.Content content : contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
labels.remove(LABEL_KEY_JOB);
Expand Down Expand Up @@ -440,13 +455,13 @@ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, Str
URLEncoder.encode("avg_over_time({" + timeSeriesSelector + "})", StandardCharsets.UTF_8))
.queryParam("step", "4h").queryParam("start", startTime).queryParam("end", endTime).build(true)
.toUri();
responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, VictoriaMetricsQueryContent.class);
responseEntity = restTemplate.exchange(uri, HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null && responseEntity.getBody().getData() != null
&& responseEntity.getBody().getData().getResult() != null) {
List<VictoriaMetricsQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
List<PromQlQueryContent.ContentData.Content> contents = responseEntity.getBody().getData()
.getResult();
for (VictoriaMetricsQueryContent.ContentData.Content content : contents) {
for (PromQlQueryContent.ContentData.Content content : contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
labels.remove(LABEL_KEY_JOB);
Expand Down Expand Up @@ -501,59 +516,4 @@ public static final class VictoriaMetricsContent {
*/
private Long[] timestamps;
}

/**
* victoria metrics query content
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static final class VictoriaMetricsQueryContent {

private String status;

private ContentData data;

/**
* content data
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static final class ContentData {

private String resultType;

private List<Content> result;

/**
* content
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static final class Content {

/**
* metric contains metric name plus labels for a particular time series
*/
private Map<String, String> metric;

/**
* values contains raw sample values for the given time series
* value-timestamp
* [1700993195,"436960986"]
*/
private Object[] value;

/**
* values contains raw sample values for the given time series
* value-timestamp list
* [[1700993195,"436960986"],[1700993195,"436960986"]...]
*/
private List<Object[]> values;
}
}
}
}
Loading

0 comments on commit fc8be06

Please sign in to comment.