Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate and publish ClickHouse node configs #288

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions orion-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>route53</artifactId>
<version>2.17.273</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.17.273</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.pinterest.orion.core.actions.aws;

import java.util.Map;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public class S3Utils {

public static S3Client getDefaultS3Client() {
return S3Client.builder().build();
}

public static void putObject(
S3Client s3,
String bucket,
String key,
byte[] object,
Map<String, String> metadata) throws S3Exception {
PutObjectRequest putReq = PutObjectRequest.builder()
.bucket(bucket)
.key(key)
.metadata(metadata)
.build();

s3.putObject(putReq, RequestBody.fromBytes(object));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package com.pinterest.orion.core.actions.clickhouse;

import java.util.logging.Logger;
import java.util.Map;
import java.util.List;
import java.util.HashMap;

import java.io.File;
import java.io.StringWriter;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.OutputKeys;

import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.w3c.dom.Element;

import com.pinterest.orion.core.actions.Action;
import com.pinterest.orion.core.actions.ActionEngine;
import com.pinterest.orion.core.actions.generic.GenericClusterWideAction;
import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.Node;
import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.actions.aws.S3Utils;

import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.S3Client;


public class PublishAllNodeConfigAction extends GenericClusterWideAction.ClusterAction {
private static final Logger logger =
Logger.getLogger(PublishAllNodeConfigAction.class.getCanonicalName());

private static final String SERVERS_TAG = "remote_servers";
private static final String REPLICA_TAG = "replica";
private static final String SHARD_TAG = "shard";
private static final String HOST_TAG = "host";
private static final String PORT_TAG = "port";

private static final String SHARD_PLACEHOLDER = "${SHARD}";
private static final String REPLICA_PLACEHOLDER = "${REPLICA}";

private Element getClusterSection(Document config, String cluster) {
NodeList clusters = config.getElementsByTagName(cluster);
if (clusters.getLength() == 0) {
Element serversSection =
(Element)(config.getElementsByTagName(SERVERS_TAG).item(0));
Element clusterSection = config.createElement(cluster);
serversSection.appendChild(clusterSection);
clusters = config.getElementsByTagName(cluster);
}
return (Element)(clusters.item(0));
}

private Element getShardSection(
Document config, Element clusterSection, int shardNum) {
NodeList shards = clusterSection.getElementsByTagName(SHARD_TAG);
int shardsLength = shards.getLength();
for (int i = 0; i < shardNum - shardsLength; ++i) {
Element shardSection = config.createElement(SHARD_TAG);
clusterSection.appendChild(shardSection);
}

// assuming shard numbers start from 1
Element shardSection = (Element)
((clusterSection.getElementsByTagName(SHARD_TAG)).item(shardNum-1));
return shardSection;
}

private Element getReplicaSection(Document config, String host, int port) {
Element hostSection = config.createElement(HOST_TAG);
hostSection.appendChild(config.createTextNode(host));

Element portSection = config.createElement(PORT_TAG);
portSection.appendChild(config.createTextNode(Integer.toString(port)));

Element replicaSection = config.createElement(REPLICA_TAG);
replicaSection.appendChild(hostSection);
replicaSection.appendChild(portSection);
return replicaSection;
}

private void addReplicaToConfig(
Document config,
int shardNum,
int replicaNum,
String host,
int port,
String cluster) throws Exception {
Element clusterSection = getClusterSection(config, cluster);
Element shardSection = getShardSection(config, clusterSection, shardNum);
Element replicaSection = getReplicaSection(config, host, port);
shardSection.appendChild(replicaSection);
}

private String getConfigString(Document config) throws Exception {
TransformerFactory transformerFactory = TransformerFactory.newInstance();
transformerFactory.setAttribute("indent-number", 4);
Transformer transformer = transformerFactory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");

DOMSource input = new DOMSource(config);
StringWriter sw = new StringWriter();
transformer.transform(input, new StreamResult(sw));
return sw.toString();
}

@Override
public void runAction() throws Exception {
ActionEngine engine = getEngine();
ClickHouseCluster cluster = (ClickHouseCluster)engine.getCluster();
Map<String, Node> nodeMap = cluster.getNodeMap();

String configTemplatePath =
cluster.getAttribute(cluster.CONFIG_TEMPLATE_PATH).getValue();
String configS3Bucket =
cluster.getAttribute(cluster.CONFIG_S3_BUCKET).getValue();

File configTemplate = new File(configTemplatePath);
DocumentBuilder docBuilder =
DocumentBuilderFactory.newInstance().newDocumentBuilder();
Document config = docBuilder.parse(configTemplate);
config.getDocumentElement().normalize();

for (Node node : nodeMap.values()) {
ClickHouseNodeInfo nodeInfo = (ClickHouseNodeInfo)node.getCurrentNodeInfo();
String hostname = nodeInfo.getHostname();
int port = nodeInfo.getServicePort();
List<String> logicalClusters = nodeInfo.getLogicalClusters();
if (logicalClusters.isEmpty()) {
markFailed("Did not find any clusters for node " + hostname);
return;
}
// right now, assume there is only one logical cluster and all
// nodes belong to that
String clusterName = logicalClusters.get(0);
int shardNum = nodeInfo.getShardNum(clusterName);
int replicaNum = nodeInfo.getReplicaNum(clusterName);

// add this node to the config as a replica under the right shard
addReplicaToConfig(config, shardNum, replicaNum, hostname, port, clusterName);
}

String configStr = getConfigString(config);
S3Client s3 = S3Utils.getDefaultS3Client();
for (Node node : nodeMap.values()) {
ClickHouseNodeInfo nodeInfo = (ClickHouseNodeInfo)node.getCurrentNodeInfo();
String clusterName = nodeInfo.getLogicalClusters().get(0);
int shardNum = nodeInfo.getShardNum(clusterName);
int replicaNum = nodeInfo.getReplicaNum(clusterName);

// for the config of each node, sub in the shard and replica number for that node
String nodeConfig = configStr.replace(SHARD_PLACEHOLDER, Integer.toString(shardNum))
.replace(REPLICA_PLACEHOLDER, Integer.toString(replicaNum));

String hostname = nodeInfo.getHostname();
String key = hostname + "/config.xml";
try {
logger.info("Pushing updated config to S3 for node " + hostname);
S3Utils.putObject(
s3,
configS3Bucket,
key,
nodeConfig.getBytes(),
new HashMap<String, String>());
} catch (S3Exception e) {
markFailed("Pushing config to S3 for node " + hostname + " failed: " + e);
s3.close();
return;
}
}
s3.close();
}

@Override
public String getName() {
return "PublishAllNodeConfigAction";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.pinterest.orion.core.automation.operator.clickhouse;

import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.automation.operator.Operator;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;

public abstract class ClickHouseOperator extends Operator {

@Override
public final void operate(Cluster cluster) throws Exception {
if(cluster instanceof ClickHouseCluster){
operate((ClickHouseCluster) cluster);
}
}

public abstract void operate(ClickHouseCluster cluster) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.pinterest.orion.core.automation.operator.clickhouse;

import java.util.logging.Logger;

import com.pinterest.orion.core.Cluster;
import com.pinterest.orion.core.automation.operator.Operator;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.actions.clickhouse.PublishAllNodeConfigAction;

public class PublishConfigOperator extends ClickHouseOperator {
private static final Logger logger =
Logger.getLogger(PublishConfigOperator.class.getCanonicalName());

@Override
public void operate(ClickHouseCluster cluster) throws Exception {
logger.info("Initializing PublishAllNodeConfigAction for ClickHouse cluster...");
PublishAllNodeConfigAction action = new PublishAllNodeConfigAction();
dispatch(action);
}

@Override
public String getName() {
return "PublishConfigOperator";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.pinterest.orion.core.PluginConfigurationException;
import com.pinterest.orion.core.clickhouse.ClickHouseCluster;
import com.pinterest.orion.core.clickhouse.ClickHouseNodeInfo;
import com.pinterest.orion.core.actions.clickhouse.PublishAllNodeConfigAction;

public class ClickHouseClusterSensor extends ClickHouseSensor {

public static final String CLUSTER_COL = "cluster";
public static final String SHARD_NUM_COL = "shard_num";
public static final String SHARD_WEIGHT_COL = "shard_weight";
public static final String REPLICA_NUM_COL = "replica_num";
public static final String PORT_COL = "port";
public static final String HOST_NAME_COL = "host_name";

public static final String CLUSTERS_QUERY = "SELECT * FROM system.clusters WHERE is_local=1";
Expand Down Expand Up @@ -95,9 +97,11 @@ private void queryShardReplicaInfo(
int shard = r.getValue(SHARD_NUM_COL).asInteger();
int shardWeight = r.getValue(SHARD_WEIGHT_COL).asInteger();
int replicaNum = r.getValue(REPLICA_NUM_COL).asInteger();
int servicePort = r.getValue(PORT_COL).asInteger();
String hostName = r.getValue(HOST_NAME_COL).asString();

nodeInfo.setHostname(hostName);
nodeInfo.setServicePort(servicePort);
nodeInfo.addShardReplicaInfo(cluster, shard, shardWeight, replicaNum);
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ClickHouseCluster extends Cluster {
private static final String DEFAULT_PASSWORD = "";

public static final String SERVERSET_PATH = "serversetPath";
public static final String CONFIG_S3_BUCKET = "configS3Bucket";
public static final String CONFIG_TEMPLATE_PATH = "configTemplatePath";

private Map<String, Object> config;

Expand All @@ -56,6 +58,8 @@ protected void bootstrapClusterInfo(Map<String, Object> config) throws PluginCon
setAttribute(USER, config.getOrDefault(USER, DEFAULT_USER));
setAttribute(PASSWORD, config.getOrDefault(PASSWORD, DEFAULT_PASSWORD));
setAttribute(SERVERSET_PATH, config.get(SERVERSET_PATH));
setAttribute(CONFIG_S3_BUCKET, config.get(CONFIG_S3_BUCKET));
setAttribute(CONFIG_TEMPLATE_PATH, config.get(CONFIG_TEMPLATE_PATH));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import java.io.Serializable;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;

class ShardReplicaInfo {
public int shardNum;
Expand All @@ -30,6 +32,10 @@ public void addShardReplicaInfo(String cluster, int shardNum, int shardWeight, i
infoByCluster.put(cluster, new ShardReplicaInfo(shardNum, shardWeight, replicaNum));
}

public List<String> getLogicalClusters() {
return new ArrayList<String>(infoByCluster.keySet());
}

public int getShardNum(String cluster) {
return infoByCluster.get(cluster).shardNum;
}
Expand Down
20 changes: 19 additions & 1 deletion orion-server/src/test/resources/configs/clickhouse-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,27 @@ clusterConfigs:
configuration:
serversetPath: /opt/orion-server/discovery.testclickhouse.test

clusterConfigs:
- clusterId: testclickhouse
type: clickhouse
configuration:
serversetPath: /opt/orion-server/discovery.testclickhouse.test
configS3Bucket: test_config_bucket
configTemplatePath: /opt/orion-server/conf/config.xml

plugins:
sensorConfigs:
- key: clusterSensor
class: com.pinterest.orion.core.automation.sensor.clickhouse.ClickHouseClusterSensor
interval: 60
enabled: true
enabled: true

actionConfigs:
- key: publishAllNodeConfig
class: com.pinterest.orion.core.actions.clickhouse.PublishAllNodeConfigAction
enabled: true

operatorConfigs:
- key: publishConfigOperator
class: com.pinterest.orion.core.automation.operator.clickhouse.PublishConfigOperator
enabled: true
Loading