Skip to content

Commit

Permalink
Add infra latency etc.
Browse files Browse the repository at this point in the history
commit-id:8fdb7dc7

Fix condition for first deploy
  • Loading branch information
tylerwowen committed Jul 25, 2024
1 parent cddbb72 commit de3d980
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class DBHostDAOImpl implements HostDAO {
private static final String GET_GROUP_SIZE = "SELECT COUNT(host_id) FROM hosts WHERE group_name=?";
private static final String GET_ALL_HOSTS_BY_GROUP = "SELECT * FROM hosts WHERE group_name=? AND state!='TERMINATING'";
private static final String GET_HOST_BY_NAME = "SELECT * FROM hosts WHERE host_name=?";
private static final String GET_HOST_BY_HOSTID = "SELECT * FROM hosts WHERE host_id=?";
private static final String GET_HOST_BY_HOSTID = "SELECT * FROM hosts WHERE host_id=? ORDER BY create_date";
private static final String GET_HOSTS_BY_STATES = "SELECT * FROM hosts WHERE state in (?, ?, ?) GROUP BY host_id ORDER BY last_update";
private static final String GET_GROUP_NAMES_BY_HOST = "SELECT group_name FROM hosts WHERE host_name=?";
private static final String GET_STALE_AGENTLESS_HOST_IDS = "SELECT DISTINCT hosts.host_id FROM hosts LEFT JOIN hosts_and_agents ON hosts.host_id = hosts_and_agents.host_id WHERE hosts.last_update < ? AND hosts_and_agents.host_id IS NULL ORDER BY hosts.last_update DESC LIMIT ?";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,6 +15,9 @@
*/
package com.pinterest.deployservice.handler;

import static com.pinterest.teletraan.universal.metrics.micrometer.PinStatsNamingConvention.CUSTOM_NAME_PREFIX;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -43,6 +46,9 @@
import com.pinterest.deployservice.dao.DeployDAO;
import com.pinterest.deployservice.dao.EnvironDAO;
import com.pinterest.deployservice.dao.HostTagDAO;

import io.micrometer.core.instrument.Metrics;

import com.pinterest.deployservice.dao.DeployConstraintDAO;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -53,6 +59,8 @@ public class GoalAnalyst {
// Notice hotfix and rollback priority should still lower than system service priority
private static final int HOT_FIX_PRIORITY = DeployPriority.HIGHER.getValue() - 20;
private static final int ROLL_BACK_PRIORITY = DeployPriority.HIGHER.getValue() - 10;
private static final String DEPLOY_LATENCY_TIMER_NAME = CUSTOM_NAME_PREFIX + "teletraan.%s.%s.deploy_latency";
private static final String FIRST_DEPLOY_COUNTER_NAME = "first_deploy";

private String host;
private String host_id;
Expand Down Expand Up @@ -320,17 +328,17 @@ AgentState proposeNewAgentState(PingReportBean report, AgentBean agent) {
}

boolean shouldUpdateAgentRecord(AgentBean origBean, AgentBean updateBean) {

if (origBean == null || updateBean == null) {
return true;
}
if (origBean.getHost_id() != null && origBean.getHost_id().equals(host_id) &&
if (origBean.getHost_id() != null && origBean.getHost_id().equals(host_id) &&
origBean.getDeploy_id() != null && origBean.getDeploy_id().equals(updateBean.getDeploy_id()) &&
origBean.getEnv_id() != null && origBean.getEnv_id().equals(updateBean.getEnv_id()) &&
origBean.getEnv_id() != null && origBean.getEnv_id().equals(updateBean.getEnv_id()) &&
origBean.getFail_count() != null && origBean.getFail_count().equals(updateBean.getFail_count()) &&
origBean.getStatus() != null && origBean.getStatus().equals(updateBean.getStatus()) &&
origBean.getStatus() != null && origBean.getStatus().equals(updateBean.getStatus()) &&
origBean.getLast_err_no() != null && origBean.getLast_err_no().equals(updateBean.getLast_err_no()) &&
origBean.getState() != null && origBean.getState().equals(updateBean.getState()) &&
origBean.getState() != null && origBean.getState().equals(updateBean.getState()) &&
origBean.getDeploy_stage() != null && origBean.getDeploy_stage().equals(updateBean.getDeploy_stage()) &&
origBean.getContainer_health_status() != null && origBean.getContainer_health_status().equals(updateBean.getContainer_health_status())) {
LOG.debug("Skip updating agent record for env_id {}, deploy_id {} on host {}",
Expand All @@ -346,18 +354,19 @@ boolean shouldUpdateAgentRecord(AgentBean origBean, AgentBean updateBean) {
// We populate all the fields, since this could be used for insertOrUpdate as well
AgentBean genUpdateBeanByReport(PingReportBean report, AgentBean agent) {
// We generate complete bean in case we need to insertOrUpdate it into agents table
long currentTime = System.currentTimeMillis();
AgentBean updateBean = new AgentBean();
updateBean.setHost_name(host);
updateBean.setHost_id(host_id);
updateBean.setDeploy_id(report.getDeployId());
updateBean.setEnv_id(report.getEnvId());
updateBean.setLast_update(System.currentTimeMillis());
updateBean.setLast_update(currentTime);
updateBean.setLast_operator(Constants.SYSTEM_OPERATOR);
updateBean.setFail_count(report.getFailCount());
updateBean.setStatus(report.getAgentStatus());
updateBean.setLast_err_no(report.getErrorCode());
updateBean.setState(proposeNewAgentState(report, agent));
updateBean.setStage_start_date(System.currentTimeMillis());
updateBean.setStage_start_date(currentTime);
updateBean.setDeploy_stage(report.getDeployStage());
if (report.getContainerHealthStatus() == null) {
updateBean.setContainer_health_status("");
Expand All @@ -368,22 +377,38 @@ AgentBean genUpdateBeanByReport(PingReportBean report, AgentBean agent) {
if (agent == null) {
// if agent is missing in agent table, treat it as not first_deploy.
updateBean.setFirst_deploy(false);
updateBean.setStart_date(System.currentTimeMillis());
updateBean.setStart_date(currentTime);
} else {
updateBean.setFirst_deploy(agent.getFirst_deploy());
updateBean.setStart_date(agent.getStart_date());
}

if (report.getDeployStage() == DeployStage.SERVING_BUILD) {
if (report.getDeployStage() == DeployStage.SERVING_BUILD && updateBean.getFirst_deploy()) {
// turn off first deploy flag
updateBean.setFirst_deploy(false);
updateBean.setFirst_deploy_time(System.currentTimeMillis());
updateBean.setFirst_deploy_time(currentTime);
emitMetrics(updateBean);
}

// TODO record error message as well if errorCode != 0
return updateBean;
}

private void emitMetrics(AgentBean updateBean) {
try {
EnvironBean env = envs.get(updateBean.getEnv_id());
Metrics.timer(String.format(DEPLOY_LATENCY_TIMER_NAME, env.getEnv_name(), env.getStage_name()))
.record(Duration.ofMillis(updateBean.getFirst_deploy_time() - updateBean.getStart_date()));
Metrics.counter(FIRST_DEPLOY_COUNTER_NAME, "success",
String.valueOf(updateBean.getStatus().equals(AgentStatus.SUCCEEDED)),
"env", env.getEnv_name(),
"stage", env.getStage_name())
.increment();
} catch (Exception ex) {
LOG.warn("Failed to emit metrics of {}", updateBean.toString(), ex);
}
}

// Generate new agent bean based on the report & current agent record,
// This is intended to be used for deploy goal to install next stage
AgentBean genNextStageUpdateBean(EnvironBean env, PingReportBean report, AgentBean agent) {
Expand Down Expand Up @@ -570,8 +595,8 @@ void process(String envId, EnvironBean env, PingReportBean report, AgentBean age
hostTagBean.setCreate_date(System.currentTimeMillis());
hostTagDAO.insertOrUpdate(hostTagBean);
LOG.info("Update host tags from Deployd: update host_tags with env id {}, host id {}, tag name {}, tag value {}", envId, host_id, tagName, tagValue);
}
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package com.pinterest.deployservice.handler;

import static com.pinterest.teletraan.universal.metrics.micrometer.PinStatsNamingConvention.CUSTOM_NAME_PREFIX;

import java.sql.Connection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -52,6 +55,7 @@
import com.pinterest.deployservice.bean.EnvType;
import com.pinterest.deployservice.bean.EnvironBean;
import com.pinterest.deployservice.bean.HostAgentBean;
import com.pinterest.deployservice.bean.HostBean;
import com.pinterest.deployservice.bean.HostState;
import com.pinterest.deployservice.bean.HostTagBean;
import com.pinterest.deployservice.bean.OpCode;
Expand Down Expand Up @@ -80,6 +84,8 @@
import com.pinterest.deployservice.dao.UtilDAO;
import com.pinterest.deployservice.pingrequests.PingRequestValidator;

import io.micrometer.core.instrument.Metrics;

/**
* This is where we handle agent ping and return deploy goal!
*/
Expand All @@ -88,7 +94,7 @@ public class PingHandler {
private static final PingResponseBean NOOP;
private static final Set<String> EMPTY_GROUPS;
private static final String PINTEREST_MAIN_AWS_ACCOUNT = "998131032990";
//private static final long AGENT_COUNT_CACHE_TTL = 5 * 1000;
private static final String PROVISION_LATENCY_TIMER_NAME = CUSTOM_NAME_PREFIX + "teletraan.%s.provision_latency";

static {
NOOP = new PingResponseBean();
Expand Down Expand Up @@ -140,7 +146,7 @@ public PingHandler(ServiceContext serviceContext) {
agentCountCacheTtl = serviceContext.getAgentCountCacheTtl();
maxParallelThreshold = serviceContext.getMaxParallelThreshold();
accountAllowList = serviceContext.getAccountAllowList();

if (serviceContext.isBuildCacheEnabled()) {
buildCache = CacheBuilder.from(serviceContext.getBuildCacheSpec().replace(";", ","))
.build(new CacheLoader<String, BuildBean>() {
Expand Down Expand Up @@ -180,13 +186,13 @@ void updateHosts(String hostName, String hostIp, String hostId, Set<String> grou
groupsToAdd.add(group);
}
}

// if it is the main account, don't update it to avoid huge updates for existing hosts
// only update sub account id for existing hosts
if (groupsToAdd.size() > 0 || accountId != null && !accountId.equals(PINTEREST_MAIN_AWS_ACCOUNT) && !accountId.equals(recordedAccountId)) {
hostDAO.insertOrUpdate(hostName, hostIp, hostId, HostState.ACTIVE.toString(), groups, accountId);
}

// Remove if not reported
for (String recordedGroup : recordedGroups) {
if (!groups.contains(recordedGroup)) {
Expand All @@ -198,27 +204,45 @@ void updateHosts(String hostName, String hostIp, String hostId, Set<String> grou

void updateHostStatus(String hostId, String hostName, String hostIp, String agentVersion, String asg) throws Exception {
HostAgentBean hostAgentBean = hostAgentDAO.getHostById(hostId);
long current_time = System.currentTimeMillis();
long currentTime = System.currentTimeMillis();
boolean isExisting = true;
if (hostAgentBean == null) {
hostAgentBean = new HostAgentBean();
hostAgentBean.setHost_id(hostId);
hostAgentBean.setCreate_date(current_time);
hostAgentBean.setCreate_date(currentTime);
isExisting = false;
}
hostAgentBean.setHost_name(hostName);
hostAgentBean.setIp(hostIp);
hostAgentBean.setLast_update(current_time);
hostAgentBean.setLast_update(currentTime);
hostAgentBean.setAgent_Version(agentVersion);
hostAgentBean.setAuto_scaling_group(asg);
hostAgentBean.setAuto_scaling_group(asg);

if (!isExisting) {
hostAgentDAO.insert(hostAgentBean);
// First ping
hostAgentDAO.insert(hostAgentBean);
emitInfraLatency(currentTime, hostId, asg);
} else {
hostAgentDAO.update(hostId, hostAgentBean);
}
}

void emitInfraLatency(long currentTime, String hostId, String asg) {
try {
List<HostBean> hosts = hostDAO.getHostsByHostId(hostId);
if (hosts.size() == 0) {
LOG.warn("No host found for hostId {}, skip", hostId);
return;
}
String timerName = String.format(PROVISION_LATENCY_TIMER_NAME, asg);
HostBean initialHost = hosts.get(0);
long infraLatency = currentTime - initialHost.getCreate_date();
Metrics.timer(timerName).record(Duration.ofMillis(infraLatency));
} catch (Exception e) {
LOG.warn("Failed to emit infra latency for " + hostId, e);
}
}

void deleteAgentSafely(String hostId, String envId) {
try {
LOG.debug("Delete agent {}/{} record.", hostId, envId);
Expand Down Expand Up @@ -274,7 +298,7 @@ boolean isAgentCountValid(String envId, AgentCountBean agentCountBean) {
return true;
}


/**
* Check if we can start deploy on host for certain env. We should not allow
* more than parallelThreshold hosts in install in the same time
Expand Down Expand Up @@ -356,8 +380,8 @@ boolean canDeploy(EnvironBean envBean, String host, AgentBean agentBean) throws
long now = System.currentTimeMillis();
agentCountBean.setLast_refresh(now);
}
/* Typically, should update agentCount and agent in transaction,
* however, treating agentCount as cache w/ ttl and
/* Typically, should update agentCount and agent in transaction,
* however, treating agentCount as cache w/ ttl and
* make sure we update count first and then agent state.
*/
LOG.debug("updating count for envId {}, existing_count {}, active_count {}, last_refresh {}, ttl {} ms",
Expand Down Expand Up @@ -586,20 +610,20 @@ private EnvType populateStageType(PingRequestBean pingRequest) throws Exception

EnvironBean envBean = populateEnviron(pingRequest.getAutoscalingGroup());
if (envBean != null && envBean.getStage_type() != EnvType.DEFAULT) {
return envBean.getStage_type();
return envBean.getStage_type();
}

// Search for stage type from groups like CMP,group
Set<String> groups = pingRequest.getGroups();
for (String group: groups) {
envBean = populateEnviron(group);
if (envBean != null && envBean.getStage_type() != EnvType.DEFAULT) {
return envBean.getStage_type();
}
}
}
return EnvType.PRODUCTION;
}

// Creates composite deploy group. size is limited by group_name size in hosts table.
// TODO: Consider storing host <-> shard mapping separately.
private Set<String> shardGroups(PingRequestBean pingRequest) throws Exception {
Expand Down Expand Up @@ -654,9 +678,9 @@ public PingResult ping(PingRequestBean pingRequest, boolean rate_limited) throws
Map<String, String> tags = mapper.readValue(ec2Tags, Map.class);
for (Map.Entry<String, String> entry : tags.entrySet()) {
LOG.debug("key: {}, val: {}", entry.getKey(), entry.getValue());
}
}
}

//update agent version for host
String agentVersion = pingRequest.getAgentVersion() != null ? pingRequest.getAgentVersion() : "UNKNOWN";

Expand Down Expand Up @@ -697,9 +721,9 @@ public PingResult ping(PingRequestBean pingRequest, boolean rate_limited) throws
if (installCandidate.needWait) {
LOG.debug("Checking if host {}, updateBean = {}, rate_limited = {}, system_priority = {} can deploy",
hostName, updateBean, rate_limited, env.getSystem_priority());
// Request has hit LWM rate-limit. we already updated heartbeat.
// Request has hit LWM rate-limit. we already updated heartbeat.
// Next, see if we can handle light-weight deploys, instead of completly discarding request.
// Idea is,
// Idea is,
// 1. we want to continue in-progress deploy.
// 2. delay starting new deploy on the host(canDeploy call below is expensive for services with system priority).
// 3. allow any light weight deploys.
Expand All @@ -719,7 +743,7 @@ public PingResult ping(PingRequestBean pingRequest, boolean rate_limited) throws
break;
} else {
LOG.debug("Host {} needs to wait for env {}. Try next env",
hostName, updateBean.getEnv_id());
hostName, updateBean.getEnv_id());
}
} else {
LOG.debug("Host {} is in the middle of deploy, no need to wait, updateBean = {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -315,6 +315,7 @@ public void testFirstTimeDeployPostRestart() throws Exception {

AgentBean agent = genDefaultAgent();
agent.setFirst_deploy(true);
agent.setStart_date(0L);
agents.put(agent.getEnv_id(), agent);
GoalAnalyst analyst = new GoalAnalyst(null, null, null, null, "foo", "id-1", envs, reports, agents, null);
analyst.analysis();
Expand Down Expand Up @@ -348,6 +349,7 @@ public void testFirstTimeDeployEnd() throws Exception {

AgentBean agent = genDefaultAgent();
agent.setFirst_deploy(true);
agent.setStart_date(0L);
agents.put(agent.getEnv_id(), agent);
GoalAnalyst analyst = new GoalAnalyst(null, null, null, null, "foo", "id-1", envs, reports, agents, null);
analyst.analysis();
Expand Down

0 comments on commit de3d980

Please sign in to comment.