Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AgentJanitor Rework #1217

Merged
merged 17 commits into from
Jul 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,6 +17,7 @@

import com.pinterest.deployservice.bean.HostBean;

import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -55,6 +56,8 @@ public interface HostDAO {

List<HostBean> getTerminatingHosts() throws Exception;

List<String> getStaleAgentlessHostIds(long noUpdateSince, int limit) throws SQLException;

Collection<HostBean> getHostsByEnvId(String envId) throws Exception;

HostBean getByEnvIdAndHostId(String envId, String hostId) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.dbutils.handlers.BeanHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;

import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
Expand All @@ -51,8 +52,7 @@ public class DBHostDAOImpl implements HostDAO {
private static final String GET_HOST_BY_HOSTID = "SELECT * FROM hosts WHERE host_id=?";
private static final String GET_HOSTS_BY_STATES = "SELECT * FROM hosts WHERE state in (?, ?, ?) GROUP BY host_id ORDER BY last_update";
private static final String GET_GROUP_NAMES_BY_HOST = "SELECT group_name FROM hosts WHERE host_name=?";
private static final String GET_STALE_ENV_HOST = "SELECT DISTINCT hosts.* FROM hosts INNER JOIN hosts_and_envs ON hosts.host_name=hosts_and_envs.host_name WHERE hosts.last_update<?";
private static final String GET_STALE_HOST = "SELECT DISTINCT hosts.* FROM hosts WHERE hosts.last_update<?";
private static final String GET_STALE_AGENTLESS_HOST_IDS = "SELECT DISTINCT hosts.host_id FROM hosts LEFT JOIN hosts_and_agents ON hosts.host_id = hosts_and_agents.host_id WHERE hosts.last_update < ? AND hosts_and_agents.host_id IS NULL ORDER BY hosts.last_update DESC LIMIT ?";
private static final String GET_HOST_NAMES_BY_GROUP = "SELECT host_name FROM hosts WHERE group_name=?";
private static final String GET_HOST_IDS_BY_GROUP = "SELECT DISTINCT host_id FROM hosts WHERE group_name=?";
private static final String GET_HOSTS_BY_ENVID = "SELECT h.* FROM hosts h INNER JOIN groups_and_envs ge ON ge.group_name = h.group_name WHERE ge.env_id=? UNION DISTINCT SELECT hs.* FROM hosts hs INNER JOIN hosts_and_envs he ON he.host_name = hs.host_name WHERE he.env_id=?";
Expand Down Expand Up @@ -194,6 +194,12 @@ public List<HostBean> getTerminatingHosts() throws Exception {
HostState.TERMINATING.toString(), HostState.PENDING_TERMINATE_NO_REPLACE.toString());
}

@Override
public List<String> getStaleAgentlessHostIds(long noUpdateSince, int limit) throws SQLException {
return new QueryRunner(dataSource).query(GET_STALE_AGENTLESS_HOST_IDS,
SingleResultSetHandlerFactory.<String>newListObjectHandler(), noUpdateSince, limit);
}

@Override
public List<HostBean> getAllActiveHostsByGroup(String groupName) throws Exception {
ResultSetHandler<List<HostBean>> h = new BeanListHandler<>(HostBean.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package com.pinterest.deployservice.handler;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pinterest.deployservice.ServiceContext;
import com.pinterest.deployservice.bean.HostBean;
import com.pinterest.deployservice.common.CommonUtils;
import com.pinterest.deployservice.dao.AgentDAO;
import com.pinterest.deployservice.dao.HostDAO;
import com.pinterest.deployservice.dao.HostAgentDAO;
import com.pinterest.deployservice.dao.HostDAO;
import com.pinterest.deployservice.dao.HostTagDAO;
import com.pinterest.deployservice.handler.HostHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HostHandler {
private static final Logger LOG = LoggerFactory.getLogger(HostHandler.class);
Expand All @@ -27,15 +24,36 @@ public HostHandler(ServiceContext serviceContext) {
hostTagDAO = serviceContext.getHostTagDAO();
}

public void removeHost(String hostId) throws Exception {
public void removeHost(String hostId) {
boolean hasException = false;
try {
hostDAO.deleteAllById(hostId);
tylerwowen marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
hasException = true;
LOG.error("Failed to remote host record " + hostId, e);
}
try {
agentDAO.deleteAllById(hostId);
} catch (Exception e) {
hasException = true;
LOG.error("Failed to remote host record " + hostId, e);
tylerwowen marked this conversation as resolved.
Show resolved Hide resolved
}
try {
hostTagDAO.deleteByHostId(hostId);
} catch (Exception e) {
hasException = true;
LOG.error("Failed to remote host record " + hostId, e);
tylerwowen marked this conversation as resolved.
Show resolved Hide resolved
}
try {
hostAgentDAO.delete(hostId);
LOG.info("Removed all records for the host {}", hostId);
} catch (Exception e) {
LOG.error("Failed to remove all records for the host {}, exception: {}", hostId, e);
hasException = true;
LOG.error("Failed to remote host record " + hostId, e);
}

if (!hasException) {
LOG.info("Removed all records for host {}", hostId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class ConfigHelper {
private static final int DEFAULT_PERIOD = 30;
private static final int DEFAULT_MAX_STALE_HOST_THRESHOLD = 600; // 10 mins
private static final int DEFAULT_MIN_STALE_HOST_THRESHOLD = 150;
private static final int DEFAULT_LAUNCH_LATENCY_THRESHOLD = 600;
private static final int DEFAULT_LAUNCH_LATENCY_THRESHOLD = 2 * 3600; // 2 hours
tylerwowen marked this conversation as resolved.
Show resolved Hide resolved
private static final String DEFAULT_DEPLOY_JANITOR_SCHEDULE = "0 30 3 * * ?";
private static final String DEFAULT_BUILD_JANITOR_SCHEDULE = "0 40 3 * * ?";
private static final int DEFAULT_MAX_DAYS_TO_KEEP = 180;
Expand Down Expand Up @@ -266,7 +266,7 @@ public static void scheduleWorkers(TeletraanServiceConfiguration configuration,
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
int minStaleHostThreshold = MapUtils.getIntValue(properties, "minStaleHostThreshold", DEFAULT_MIN_STALE_HOST_THRESHOLD);
int maxStaleHostThreshold = MapUtils.getIntValue(properties, "maxStaleHostThreshold", DEFAULT_MAX_STALE_HOST_THRESHOLD);
int maxLaunchLatencyThreshold = MapUtils.getIntValue(properties, "maxLaunchLaencyThreshold", DEFAULT_LAUNCH_LATENCY_THRESHOLD);
int maxLaunchLatencyThreshold = MapUtils.getIntValue(properties, "maxLaunchLatencyThreshold", DEFAULT_LAUNCH_LATENCY_THRESHOLD);
Runnable worker = new AgentJanitor(serviceContext, minStaleHostThreshold, maxStaleHostThreshold, maxLaunchLatencyThreshold);
scheduler.scheduleAtFixedRate(worker, initDelay, period, TimeUnit.SECONDS);
LOG.info("Scheduled AgentJanitor.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,83 +15,94 @@
*/
package com.pinterest.teletraan.worker;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pinterest.deployservice.ServiceContext;
import com.pinterest.deployservice.bean.HostBean;
import com.pinterest.deployservice.bean.HostAgentBean;
import com.pinterest.deployservice.bean.HostBean;
import com.pinterest.deployservice.bean.HostState;
import com.pinterest.deployservice.group.HostGroupManager;
import com.pinterest.deployservice.rodimus.RodimusManager;
import com.pinterest.deployservice.handler.HostHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

/**
* Housekeeping on stuck and dead agents
* <p>
* if an agent has not ping server for certain time, we will cross check with
* authoritive source to confirm if the host is terminated, and handle the agent
* status accordingly
* Housekeeping on stuck and dead agents and hosts
*
* If an agent has not ping server for certain time, we will cross check with
* authoritative source to confirm if the host is terminated, and handle the
* agent status accordingly.
*
* If a host doesn't have any agent for a while, we will handle the host
* accordingly.
*/
public class AgentJanitor extends SimpleAgentJanitor {
private static final Logger LOG = LoggerFactory.getLogger(AgentJanitor.class);
private HostGroupManager hostGroupDAO;
private final RodimusManager rodimusManager;
private long maxLaunchLatencyThreshold;
private long absoluteThreshold = 7 * 24 * 3600 * 1000; // 7 days
private int agentlessHostBatchSize = 300;

public AgentJanitor(ServiceContext serviceContext, int minStaleHostThreshold,
int maxStaleHostThreshold, int maxLaunchLatencyThreshold) {
int maxStaleHostThreshold, int maxLaunchLatencyThreshold) {
super(serviceContext, minStaleHostThreshold, maxStaleHostThreshold);
hostGroupDAO = serviceContext.getHostGroupDAO();
rodimusManager = serviceContext.getRodimusManager();
this.maxLaunchLatencyThreshold = maxLaunchLatencyThreshold * 1000;
}

private Collection<String> getTerminatedHostsFromSource(Collection<String> staleHostIds) throws Exception {
Collection<String> terminatedHosts = new ArrayList<>(staleHostIds);
for (String hostId : staleHostIds) {
Collection<String> resultIds = rodimusManager.getTerminatedHosts(Collections.singletonList(hostId));
if (resultIds.isEmpty()) {
terminatedHosts.remove(hostId);
private Set<String> getTerminatedHostsFromSource(List<String> staleHostIds) {
int batchSize = 10;
Set<String> terminatedHosts = new HashSet<>();
for (int i = 0; i < staleHostIds.size(); i += batchSize) {
try {
terminatedHosts.addAll(rodimusManager
.getTerminatedHosts(staleHostIds.subList(i, Math.min(i + batchSize, staleHostIds.size()))));
} catch (Exception ex) {
LOG.error("Failed to get terminated hosts", ex);
}
}
return terminatedHosts;
}

private Long getInstanceLaunchGracePeriod(String clusterName) throws Exception {
Long launchGracePeriod = (clusterName != null) ? rodimusManager.getClusterInstanceLaunchGracePeriod(clusterName) : null;
private Long getInstanceLaunchGracePeriod(String clusterName) {
Long launchGracePeriod = null;
if (clusterName != null) {
try {
launchGracePeriod = rodimusManager.getClusterInstanceLaunchGracePeriod(clusterName);
} catch (Exception ex) {
LOG.error("failed to get launch grace period for cluster {}, exception: {}", clusterName, ex);
}
}
return launchGracePeriod == null ? maxLaunchLatencyThreshold : launchGracePeriod * 1000;
}

// Process stale hosts (hosts which have not pinged for more than min threshold period)
// Removes hosts once confirmed with source
private void processLowWatermarkHosts() throws Exception {
long current_time = System.currentTimeMillis();
// If host fails to ping for longer than min stale threshold,
// either mark them as UNREACHABLE, or remove if confirmed with source of truth
long minThreshold = current_time - minStaleHostThreshold;
List<HostAgentBean> minStaleHosts = hostAgentDAO.getStaleHosts(minThreshold);
Set<String> minStaleHostIds = new HashSet<>();
for (HostAgentBean hostAgentBean: minStaleHosts) {
minStaleHostIds.add(hostAgentBean.getHost_id());
private boolean isHostStale(HostAgentBean hostAgentBean) {
if (hostAgentBean == null || hostAgentBean.getLast_update() == null) {
return false;
}
Collection<String> terminatedHosts = getTerminatedHostsFromSource(minStaleHostIds);
for (String removedId: terminatedHosts) {
removeStaleHost(removedId);

long current_time = System.currentTimeMillis();
if (current_time - hostAgentBean.getLast_update() >= absoluteThreshold) {
return true;
}
}

private boolean isHostStale(HostAgentBean hostAgentBean) throws Exception {
if (hostAgentBean == null || hostAgentBean.getLast_update() == null) {
HostBean hostBean;
try {
hostBean = hostDAO.getHostsByHostId(hostAgentBean.getHost_id()).get(0);
} catch (Exception ex) {
LOG.error("failed to get host bean for ({}), {}", hostAgentBean, ex);
return false;
}
HostBean hostBean = hostDAO.getHostsByHostId(hostAgentBean.getHost_id()).get(0);
long current_time = System.currentTimeMillis();

Long launchGracePeriod = getInstanceLaunchGracePeriod(hostAgentBean.getAuto_scaling_group());
if ((hostBean.getState() == HostState.PROVISIONED) && (current_time - hostAgentBean.getLast_update() >= launchGracePeriod)) {
if ((hostBean.getState() == HostState.PROVISIONED)
&& (current_time - hostAgentBean.getLast_update() >= launchGracePeriod)) {
return true;
}
if (hostBean.getState() != HostState.TERMINATING && !hostBean.isPendingTerminate() &&
Expand All @@ -101,33 +112,101 @@ private boolean isHostStale(HostAgentBean hostAgentBean) throws Exception {
return false;
}

// Process stale hosts (hosts which have not pinged for more than max threshold period)
// Marks hosts unreachable if it's stale for max threshold
// Removes hosts once confirmed with source
private void processHighWatermarkHosts() throws Exception {
/**
* Process stale hosts which have not pinged since
* current_time - minStaleHostThreshold
* They will be candidates for stale hosts which will be removed in future
* executions.
* Either mark them as UNREACHABLE, or remove if confirmed with source of truth.
*/
private void determineStaleHostCandidates() {
long current_time = System.currentTimeMillis();
long maxThreshold = current_time - Math.min(maxStaleHostThreshold, maxLaunchLatencyThreshold);
List<HostAgentBean> maxStaleHosts = hostAgentDAO.getStaleHosts(maxThreshold);
Set<String> staleHostIds = new HashSet<>();
long minThreshold = current_time - minStaleHostThreshold;
List<HostAgentBean> unreachableHosts;
try {
unreachableHosts = hostAgentDAO.getStaleHosts(minThreshold);
} catch (Exception ex) {
LOG.error("failed to get unreachable hosts", ex);
return;
}
ArrayList<String> unreachableHostIds = new ArrayList<>();
unreachableHosts.stream().map(hostAgent -> unreachableHostIds.add(hostAgent.getHost_id()));

for (HostAgentBean hostAgentBean : maxStaleHosts) {
if (isHostStale(hostAgentBean)) {
staleHostIds.add(hostAgentBean.getHost_id());
Set<String> terminatedHosts = getTerminatedHostsFromSource(unreachableHostIds);
for (String unreachableId : unreachableHostIds) {
if (terminatedHosts.contains(unreachableId)) {
removeStaleHost(unreachableId);
} else {
markUnreachableHost(unreachableId);
}
}
Collection<String> terminatedHosts = getTerminatedHostsFromSource(staleHostIds);
for (String staleId : staleHostIds) {
}

/**
* Process stale hosts which have not pinged since
* current_time - maxStaleHostThreshold
* They are confirmed stale hosts, should be removed from Teletraan
*/
private void processStaleHosts() {
long current_time = System.currentTimeMillis();
long maxThreshold = current_time - maxStaleHostThreshold;
List<HostAgentBean> staleHosts;
try {
staleHosts = hostAgentDAO.getStaleHosts(maxThreshold);
} catch (Exception ex) {
LOG.error("failed to get stale hosts", ex);
return;
}

Map<String, HostAgentBean> staleHostMap = new HashMap<>();
staleHosts.stream().map(hostAgent -> staleHostMap.put(hostAgent.getHost_id(), hostAgent));

Set<String> terminatedHosts = getTerminatedHostsFromSource(new ArrayList<>(staleHostMap.keySet()));
for (String staleId : staleHostMap.keySet()) {
if (terminatedHosts.contains(staleId)) {
removeStaleHost(staleId);
} else {
markUnreachableHost(staleId);
HostAgentBean hostAgent = staleHostMap.get(staleId);
if (isHostStale(hostAgent)) {
LOG.warn("Agent ({}) is stale (not Pinning Teletraan), but might be running.",
tylerwowen marked this conversation as resolved.
Show resolved Hide resolved
hostAgent);
}
}
}
}

/**
* Clean up hosts without any agents
*
* If a host is directly added to Teletraan, there will be no agent associated
* with it immediately.
* Hosts may stuck in this state so we should clean up here.
*/
private void cleanUpAgentlessHosts() {
long current_time = System.currentTimeMillis();
long noUpdateSince = current_time - maxLaunchLatencyThreshold;
List<String> agentlessHosts;
try {
agentlessHosts = hostDAO.getStaleAgentlessHostIds(noUpdateSince, agentlessHostBatchSize);
} catch (SQLException ex) {
LOG.error("failed to get agentless hosts", ex);
return;
}

Set<String> terminatedHosts = getTerminatedHostsFromSource(agentlessHosts);
for (String hostId : agentlessHosts) {
if (terminatedHosts.contains(hostId)) {
removeStaleHost(hostId);
} else {
LOG.warn("Agentless host {} is stale but might be running", hostId);
}
}
}

@Override
void processAllHosts() throws Exception {
processLowWatermarkHosts();
processHighWatermarkHosts();
void processAllHosts() {
processStaleHosts();
determineStaleHostCandidates();
cleanUpAgentlessHosts();
}
}
Loading
Loading