diff --git a/deploy-service/common/pom.xml b/deploy-service/common/pom.xml
index e7cc839493..72342a175f 100644
--- a/deploy-service/common/pom.xml
+++ b/deploy-service/common/pom.xml
@@ -19,6 +19,26 @@
3.1.0
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.3.0
+
+ false
+
+
+
+
+ test-jar
+
+
+
+
+
+
+
com.pinterest.teletraan
diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java
index add0f06ea9..24dd0f47b3 100644
--- a/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java
+++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/dao/HostDAO.java
@@ -58,6 +58,8 @@ public interface HostDAO {
List getStaleAgentlessHostIds(long lastUpdateBefore, int limit) throws SQLException;
+ List getAgentlessHosts(long lastUpdateAfter, int limit) throws SQLException;
+
Collection getHostsByEnvId(String envId) throws Exception;
HostBean getByEnvIdAndHostId(String envId, String hostId) throws Exception;
diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java
index abd92cc45a..0133e8c084 100644
--- a/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java
+++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/db/DBHostDAOImpl.java
@@ -53,6 +53,7 @@ public class DBHostDAOImpl implements HostDAO {
private static final String GET_HOSTS_BY_STATES = "SELECT * FROM hosts WHERE state in (?, ?, ?) GROUP BY host_id ORDER BY last_update";
private static final String GET_GROUP_NAMES_BY_HOST = "SELECT group_name FROM hosts WHERE host_name=?";
private static final String GET_STALE_AGENTLESS_HOST_IDS = "SELECT DISTINCT hosts.host_id FROM hosts LEFT JOIN hosts_and_agents ON hosts.host_id = hosts_and_agents.host_id WHERE hosts.last_update < ? AND hosts_and_agents.host_id IS NULL ORDER BY hosts.last_update DESC LIMIT ?";
+ private static final String GET_AGENTLESS_HOSTS = "SELECT hosts.* FROM hosts LEFT JOIN hosts_and_agents ON hosts.host_id = hosts_and_agents.host_id WHERE hosts.last_update > ? AND hosts_and_agents.host_id IS NULL ORDER BY hosts.last_update DESC LIMIT ?";
private static final String GET_HOST_NAMES_BY_GROUP = "SELECT host_name FROM hosts WHERE group_name=?";
private static final String GET_HOST_IDS_BY_GROUP = "SELECT DISTINCT host_id FROM hosts WHERE group_name=?";
private static final String GET_HOSTS_BY_ENVID = "SELECT h.* FROM hosts h INNER JOIN groups_and_envs ge ON ge.group_name = h.group_name WHERE ge.env_id=? UNION DISTINCT SELECT hs.* FROM hosts hs INNER JOIN hosts_and_envs he ON he.host_name = hs.host_name WHERE he.env_id=?";
@@ -203,6 +204,13 @@ public List getStaleAgentlessHostIds(long lastUpdateBefore, int limit) t
SingleResultSetHandlerFactory.newListObjectHandler(), lastUpdateBefore, limit);
}
+
+ @Override
+ public List getAgentlessHosts(long lastUpdateAfter, int limit) throws SQLException {
+ ResultSetHandler> h = new BeanListHandler<>(HostBean.class);
+ return new QueryRunner(dataSource).query(GET_AGENTLESS_HOSTS, h, lastUpdateAfter, limit);
+ }
+
@Override
public List getAllActiveHostsByGroup(String groupName) throws Exception {
ResultSetHandler> h = new BeanListHandler<>(HostBean.class);
diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/metrics/DefaultHostClassifier.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/metrics/DefaultHostClassifier.java
new file mode 100644
index 0000000000..1f7e411318
--- /dev/null
+++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/metrics/DefaultHostClassifier.java
@@ -0,0 +1,103 @@
+package com.pinterest.deployservice.metrics;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.pinterest.deployservice.bean.HostBean;
+
+public class DefaultHostClassifier implements HostClassifier {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultHostClassifier.class);
+
+ private List carryoverHosts = new ArrayList<>();
+ private List newHosts = new ArrayList<>();
+ private List removedHosts = new ArrayList<>();
+ private List timeoutHosts = new ArrayList<>();
+ private @Nonnull List initializingHosts = new ArrayList<>();
+
+ @Override
+ public List getTimeoutHosts() {
+ return timeoutHosts;
+ }
+
+ @Override
+ public List getRemovedHosts() {
+ return removedHosts;
+ }
+
+ @Override
+ public List getNewHosts() {
+ return newHosts;
+ }
+
+ @Override
+ public List getCarryoverHosts() {
+ return carryoverHosts;
+ }
+
+ @Override
+ public List getInitializingHosts() {
+ return initializingHosts;
+ }
+
+ private Map getInitializingHostMap() {
+ return Maps.uniqueIndex(initializingHosts, HostBean::getHost_id);
+ }
+
+ @Override
+ public void updateClassification(Collection agentlessHosts, Instant timeoutInstant) {
+ Map uniqueAgentlessHostMap = deduplicateHosts(agentlessHosts);
+ Map previousInitializingHosts = getInitializingHostMap();
+ Map removedHostMap = new HashMap<>(previousInitializingHosts);
+
+ List newTimeoutHosts = new ArrayList<>();
+ List newlyLaunchedHosts = new ArrayList<>();
+ List newCarryoverHosts = new ArrayList<>();
+
+ initializingHosts = new ArrayList<>(uniqueAgentlessHostMap.values());
+ for (HostBean host : initializingHosts) {
+ removedHostMap.remove(host.getHost_id());
+ Instant hostCreationInstant = Instant.ofEpochMilli(host.getCreate_date());
+ if (hostCreationInstant.isBefore(timeoutInstant)) {
+ newTimeoutHosts.add(host);
+ }
+ if (previousInitializingHosts.containsKey(host.getHost_id())) {
+ newCarryoverHosts.add(host);
+ } else {
+ newlyLaunchedHosts.add(host);
+ }
+ }
+
+ removedHosts = new ArrayList<>(removedHostMap.values());
+ carryoverHosts = newCarryoverHosts;
+ timeoutHosts = newTimeoutHosts;
+ newHosts = newlyLaunchedHosts;
+
+ LOG.info(
+ "Host classification of {} agentless hosts based on {} previous initializing hosts: {} new, {} carryover, {} removed, {} timeout, {} initializing",
+ agentlessHosts.size(), previousInitializingHosts.size(), newHosts.size(), carryoverHosts.size(),
+ removedHosts.size(), timeoutHosts.size(), initializingHosts.size());
+ }
+
+ private Map deduplicateHosts(Collection agentlessHosts) {
+ Map uniqueHosts = new HashMap<>();
+ for (HostBean host : agentlessHosts) {
+ if (!uniqueHosts.containsKey(host.getHost_id()) ||
+ host.getCreate_date() < uniqueHosts.get(host.getHost_id()).getCreate_date()) {
+ uniqueHosts.put(host.getHost_id(), host);
+ }
+ }
+ return uniqueHosts;
+ }
+
+}
diff --git a/deploy-service/common/src/main/java/com/pinterest/deployservice/metrics/HostClassifier.java b/deploy-service/common/src/main/java/com/pinterest/deployservice/metrics/HostClassifier.java
new file mode 100644
index 0000000000..2cc4b23f73
--- /dev/null
+++ b/deploy-service/common/src/main/java/com/pinterest/deployservice/metrics/HostClassifier.java
@@ -0,0 +1,68 @@
+package com.pinterest.deployservice.metrics;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+
+import com.pinterest.deployservice.bean.HostBean;
+
+public interface HostClassifier {
+
+ /**
+ * Retrieves hosts that are newly added.
+ *
+ * Note this is a subset of hosts that are initializing.
+ *
+ * @return a list of newly added hosts
+ */
+ List getNewHosts();
+
+ /**
+ * Retrieves hosts that are carried over from last update.
+ *
+ * Note this is a subset of hosts that are initializing.
+ *
+ * @return a list of carried over hosts
+ */
+ List getCarryoverHosts();
+
+ /**
+ * Retrieves hosts that have timed out.
+ *
+ * Note this is a subset of hosts that are initializing.
+ *
+ * @return a list of hosts that have timed out
+ */
+ List getTimeoutHosts();
+
+ /**
+ * Retrieves hosts that have been removed.
+ *
+ * Specifically, a previously initializing host that is no longer in the
+ * provided host list.
+ * A host can be absent from the provided host list for 2 reasons:
+ * 1. It has been initialized
+ * 2. It has been taking too long to initialize
+ *
+ * @return a list of hosts that have been removed
+ */
+ List getRemovedHosts();
+
+ /**
+ * Retrieves hosts that are currently initializing.
+ *
+ * Note this is the union of newly added and carryover hosts.
+ *
+ * @return a list of hosts that are currently initializing
+ */
+ List getInitializingHosts();
+
+ /**
+ * Updates the classification of hosts.
+ *
+ * @param hosts the collection of hosts to update the classification
+ * with
+ * @param timeoutInstant the instant used to determine the timeout hosts
+ */
+ void updateClassification(Collection hosts, Instant timeoutInstant);
+}
\ No newline at end of file
diff --git a/deploy-service/common/src/test/java/com/pinterest/deployservice/bean/BeanUtils.java b/deploy-service/common/src/test/java/com/pinterest/deployservice/bean/BeanUtils.java
new file mode 100644
index 0000000000..c41fd170de
--- /dev/null
+++ b/deploy-service/common/src/test/java/com/pinterest/deployservice/bean/BeanUtils.java
@@ -0,0 +1,17 @@
+package com.pinterest.deployservice.bean;
+
+import java.time.Instant;
+import java.util.UUID;
+
+public class BeanUtils {
+ public static HostBean createHostBean(Instant createDate) {
+ HostBean bean = new HostBean();
+ bean.setHost_id("i-" + UUID.randomUUID().toString().substring(0, 8));
+ bean.setGroup_name("testEnv-testStage");
+ bean.setCreate_date(createDate.toEpochMilli());
+ bean.setLast_update(createDate.plusSeconds(1).toEpochMilli());
+ bean.setCan_retire(0);
+ bean.setState(HostState.PROVISIONED);
+ return bean;
+ }
+}
diff --git a/deploy-service/common/src/test/java/com/pinterest/deployservice/metrics/DefaultHostClassifierTest.java b/deploy-service/common/src/test/java/com/pinterest/deployservice/metrics/DefaultHostClassifierTest.java
new file mode 100644
index 0000000000..fca2f83166
--- /dev/null
+++ b/deploy-service/common/src/test/java/com/pinterest/deployservice/metrics/DefaultHostClassifierTest.java
@@ -0,0 +1,190 @@
+package com.pinterest.deployservice.metrics;
+
+import static com.pinterest.deployservice.bean.BeanUtils.createHostBean;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.pinterest.deployservice.bean.HostBean;
+
+public class DefaultHostClassifierTest {
+
+ private DefaultHostClassifier sut;
+ private final Instant timeoutInstant = Instant.now().minus(Duration.ofMinutes(1l));
+ private List regularHost;
+ private List timeoutHost;
+
+ @Before
+ public void setUp() {
+ sut = new DefaultHostClassifier();
+ regularHost = Collections.singletonList(createHostBean(Instant.now()));
+ timeoutHost = Collections
+ .singletonList(createHostBean(Instant.now().minus(Duration.ofMinutes(2l))));
+ }
+
+ @Test
+ public void testGetCarryoverHosts() {
+ assertEquals(0, sut.getCarryoverHosts().size());
+ }
+
+ @Test
+ public void testGetRemovedHosts() {
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+
+ @Test
+ public void testGetInitializingHosts() {
+ assertEquals(0, sut.getInitializingHosts().size());
+ }
+
+ @Test
+ public void testGetNewHosts() {
+ assertEquals(0, sut.getNewHosts().size());
+ }
+
+ @Test
+ public void testGetTimeoutHosts() {
+ assertEquals(0, sut.getTimeoutHosts().size());
+ }
+
+ @Test
+ public void oneInvocation_regularHost() {
+ sut.updateClassification(regularHost, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getInitializingHosts()));
+ assertEquals(0, sut.getTimeoutHosts().size());
+ assertEquals(0, sut.getCarryoverHosts().size());
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+
+ @Test
+ public void oneInvocation_timeOutHost() {
+ sut.updateClassification(timeoutHost, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getInitializingHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getTimeoutHosts()));
+ assertEquals(0, sut.getCarryoverHosts().size());
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+
+ @Test
+ public void twoInvocations_sameHost() {
+ sut.updateClassification(regularHost, timeoutInstant);
+ sut.updateClassification(regularHost, timeoutInstant);
+
+ assertEquals(0, sut.getNewHosts().size());
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getInitializingHosts()));
+ assertEquals(0, sut.getTimeoutHosts().size());
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getCarryoverHosts()));
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void twoInvocations_newRegularHost() {
+ List secondHost = Collections.singletonList(createHostBean(Instant.now()));
+ List allHosts = (List) CollectionUtils.union(regularHost, secondHost);
+
+ sut.updateClassification(regularHost, timeoutInstant);
+ sut.updateClassification(allHosts, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(secondHost, sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(allHosts, sut.getInitializingHosts()));
+ assertEquals(0, sut.getTimeoutHosts().size());
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getCarryoverHosts()));
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+
+ @Test
+ public void twoInvocations_newRegularHostOnly() {
+ List secondHost = Collections.singletonList(createHostBean(Instant.now()));
+
+ sut.updateClassification(regularHost, timeoutInstant);
+ sut.updateClassification(secondHost, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(secondHost, sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(secondHost, sut.getInitializingHosts()));
+ assertEquals(0, sut.getTimeoutHosts().size());
+ assertEquals(0, sut.getCarryoverHosts().size());
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getRemovedHosts()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void twoInvocations_timeoutHost() {
+ List allHosts = (List) CollectionUtils.union(regularHost, timeoutHost);
+ sut.updateClassification(regularHost, timeoutInstant);
+ sut.updateClassification(allHosts, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(allHosts, sut.getInitializingHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getTimeoutHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getCarryoverHosts()));
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void twoInvocations_timeoutHostCompleted() {
+ sut.updateClassification(timeoutHost, timeoutInstant);
+ sut.updateClassification(CollectionUtils.EMPTY_COLLECTION, timeoutInstant);
+
+ assertEquals(0, sut.getNewHosts().size());
+ assertEquals(0, sut.getInitializingHosts().size());
+ assertEquals(0, sut.getTimeoutHosts().size());
+ assertEquals(0, sut.getCarryoverHosts().size());
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getRemovedHosts()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void twoInvocations_complex() {
+ List carryOverHost = Collections.singletonList(createHostBean(Instant.now()));
+ List newHost = Collections.singletonList(createHostBean(Instant.now()));
+ List agentLessHosts = new ArrayList<>(carryOverHost);
+ agentLessHosts.addAll(newHost);
+ agentLessHosts.addAll(timeoutHost);
+
+ sut.updateClassification(CollectionUtils.union(regularHost, carryOverHost), timeoutInstant);
+ sut.updateClassification(agentLessHosts, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(CollectionUtils.union(newHost, timeoutHost), sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(agentLessHosts, sut.getInitializingHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(timeoutHost, sut.getTimeoutHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(carryOverHost, sut.getCarryoverHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(regularHost, sut.getRemovedHosts()));
+ }
+
+ @Test
+ public void duplicateHostId_earliestHostOnly() {
+ List hosts = new ArrayList<>(regularHost);
+ HostBean laterHost = createHostBean(Instant.now());
+ HostBean earlierHost = createHostBean(Instant.ofEpochMilli(regularHost.get(0).getCreate_date() - 1000));
+
+ laterHost.setHost_id(regularHost.get(0).getHost_id());
+ earlierHost.setHost_id(regularHost.get(0).getHost_id());
+
+ hosts.add(laterHost);
+ hosts.add(earlierHost);
+ List expectedHosts = Collections.singletonList(earlierHost);
+
+ sut.updateClassification(hosts, timeoutInstant);
+
+ assertTrue(CollectionUtils.isEqualCollection(expectedHosts, sut.getNewHosts()));
+ assertTrue(CollectionUtils.isEqualCollection(expectedHosts, sut.getInitializingHosts()));
+ assertEquals(0, sut.getTimeoutHosts().size());
+ assertEquals(0, sut.getCarryoverHosts().size());
+ assertEquals(0, sut.getRemovedHosts().size());
+ }
+}
diff --git a/deploy-service/teletraanservice/pom.xml b/deploy-service/teletraanservice/pom.xml
index bd91ba8026..ebea17ba5a 100644
--- a/deploy-service/teletraanservice/pom.xml
+++ b/deploy-service/teletraanservice/pom.xml
@@ -75,6 +75,20 @@
+
+ com.pinterest.teletraan
+ common
+ tests
+ test-jar
+ 0.1-SNAPSHOT
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
com.pinterest.teletraan
universal
diff --git a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/MetricsEmitter.java b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/MetricsEmitter.java
index 0dea64b80c..238822497e 100644
--- a/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/MetricsEmitter.java
+++ b/deploy-service/teletraanservice/src/main/java/com/pinterest/teletraan/worker/MetricsEmitter.java
@@ -1,15 +1,34 @@
package com.pinterest.teletraan.worker;
+import static com.pinterest.teletraan.universal.metrics.micrometer.PinStatsNamingConvention.CUSTOM_NAME_PREFIX;
+
import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.pinterest.deployservice.ServiceContext;
+import com.pinterest.deployservice.bean.HostBean;
import com.pinterest.deployservice.dao.DeployDAO;
import com.pinterest.deployservice.dao.HostAgentDAO;
+import com.pinterest.deployservice.dao.HostDAO;
+import com.pinterest.deployservice.metrics.DefaultHostClassifier;
+import com.pinterest.deployservice.metrics.HostClassifier;
+import com.pinterest.teletraan.universal.metrics.ErrorBudgetCounterFactory;
+import com.pinterest.teletraan.universal.metrics.micrometer.PinStatsLongTaskTimer;
+import com.pinterest.teletraan.universal.metrics.micrometer.PinStatsMeterRegistry;
+import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Metrics;
public class MetricsEmitter implements Runnable {
@@ -18,54 +37,143 @@ public class MetricsEmitter implements Runnable {
static final String HOSTS_TOTAL = "hosts.total";
static final String DEPLOYS_TODAY_TOTAL = "deploys.today.total";
static final String DEPLOYS_RUNNING_TOTAL = "deploys.running.total";
+ static final String HOSTS_LAUNCHING = CUSTOM_NAME_PREFIX + "teletraan.%s.hosts_launching";
+ static final String ERROR_BUDGET_METHOD_NAME = "host_launch";
+ static final int LAUNCH_TIMEOUT_MINUTE = 10;
+ static final int MAX_TRACK_DURATION_MINUTE = LAUNCH_TIMEOUT_MINUTE * 10; // keep tracking for 10x timeout
+
+ static long reportHostsCount(HostAgentDAO hostAgentDAO) {
+ try {
+ return hostAgentDAO.getDistinctHostsCount();
+ } catch (SQLException e) {
+ LOG.error("Failed to get host count", e);
+ }
+ return 0;
+ }
+
+ static long reportDailyDeployCount(DeployDAO deployDAO) {
+ try {
+ return deployDAO.getDailyDeployCount();
+ } catch (SQLException e) {
+ LOG.error("Failed to get daily deploy count", e);
+ }
+ return 0;
+ }
+
+ static long reportRunningDeployCount(DeployDAO deployDAO) {
+ try {
+ return deployDAO.getRunningDeployCount();
+ } catch (SQLException e) {
+ LOG.error("Failed to get running deploy count", e);
+ }
+ return 0;
+ }
+
+ private final Clock clock;
+ private final HostClassifier hostClassifier;
+ private final HostDAO hostDAO;
+ private final Map hostTimers = new HashMap<>();
+ private final Counter errorBudgetSuccess;
+ private final Counter errorBudgetFailure;
public MetricsEmitter(ServiceContext serviceContext) {
+ this(serviceContext, Clock.SYSTEM);
+ }
+
+ public MetricsEmitter(ServiceContext serviceContext, Clock clock) {
+ this.clock = clock;
// HostAgentDAO is more efficient than HostDAO to get total hosts
Gauge.builder(HOSTS_TOTAL, serviceContext.getHostAgentDAO(), MetricsEmitter::reportHostsCount)
.strongReference(true)
.register(Metrics.globalRegistry);
-
Gauge.builder(DEPLOYS_TODAY_TOTAL, serviceContext.getDeployDAO(), MetricsEmitter::reportDailyDeployCount)
.strongReference(true)
.register(Metrics.globalRegistry);
Gauge.builder(DEPLOYS_RUNNING_TOTAL, serviceContext.getDeployDAO(), MetricsEmitter::reportRunningDeployCount)
.strongReference(true)
.register(Metrics.globalRegistry);
+
+ hostDAO = serviceContext.getHostDAO();
+ hostClassifier = new DefaultHostClassifier();
+ errorBudgetSuccess = ErrorBudgetCounterFactory.createSuccessCounter(ERROR_BUDGET_METHOD_NAME);
+ errorBudgetFailure = ErrorBudgetCounterFactory.createFailureCounter(ERROR_BUDGET_METHOD_NAME);
}
@Override
public void run() {
- // noop
- // The initial gauges are registered in the constructor,
- // and they will be updated automatically.
- // If we want to add more complex metrics that require extra logic,
- // we can add them here.
+ emitLaunchingMetrics();
}
- static long reportHostsCount(HostAgentDAO hostAgentDAO) {
+ void emitLaunchingMetrics() {
+ Instant timeoutCutoff = Instant.ofEpochMilli(clock.wallTime()).minus(Duration.ofMinutes(LAUNCH_TIMEOUT_MINUTE));
+ updateHostClassification(timeoutCutoff);
try {
- return hostAgentDAO.getDistinctHostsCount();
- } catch (SQLException e) {
- LOG.error("Failed to get host count", e);
+ processRemovedHosts();
+ processNewHosts();
+ } catch (Exception e) {
+ LOG.error("Failed to update host classification", e);
}
- return 0;
+ cleanUpTimers();
}
- static long reportDailyDeployCount(DeployDAO deployDAO) {
+ private void updateHostClassification(Instant timeoutCutoff) {
try {
- return deployDAO.getDailyDeployCount();
+ List agentlessHosts = hostDAO
+ .getAgentlessHosts(Instant.ofEpochMilli(clock.wallTime()).minus(Duration.ofMinutes(MAX_TRACK_DURATION_MINUTE)).toEpochMilli(), 10000);
+ hostClassifier.updateClassification(agentlessHosts, timeoutCutoff);
} catch (SQLException e) {
- LOG.error("Failed to get daily deploy count", e);
+ LOG.error("Failed to get agentless hosts", e);
}
- return 0;
}
- static long reportRunningDeployCount(DeployDAO deployDAO) {
- try {
- return deployDAO.getRunningDeployCount();
- } catch (SQLException e) {
- LOG.error("Failed to get running deploy count", e);
+ private void processRemovedHosts() {
+ Collection removedHosts = hostClassifier.getRemovedHosts();
+ for (HostBean host : removedHosts) {
+ String hostId = host.getHost_id();
+
+ if (hostTimers.containsKey(hostId)) {
+ LongTaskTimer.Sample sample = hostTimers.remove(hostId);
+ if (sample.duration(TimeUnit.MILLISECONDS) > Duration.ofMinutes(LAUNCH_TIMEOUT_MINUTE).toMillis()) {
+ // Only consider hosts that have been launched after timeout cutoff
+ errorBudgetFailure.increment();
+ } else {
+ errorBudgetSuccess.increment();
+ }
+ sample.stop();
+ } else {
+ LOG.warn("Timer for removed host {} not found, skip", hostId);
+ }
+ }
+ }
+
+ private void processNewHosts() {
+ // Only `PinStatsMeterRegistry` can create `PinStatsLongTaskTimer`
+ PinStatsMeterRegistry registry = (PinStatsMeterRegistry) Metrics.globalRegistry.getRegistries().stream()
+ .filter(r -> r instanceof PinStatsMeterRegistry).findFirst().get();
+ if (registry != null) {
+ Collection newHosts = hostClassifier.getNewHosts();
+ for (HostBean host : newHosts) {
+ String timerName = String.format(HOSTS_LAUNCHING, host.getGroup_name());
+ PinStatsLongTaskTimer timer = (PinStatsLongTaskTimer) LongTaskTimer.builder(timerName)
+ .serviceLevelObjectives(Duration.ofMinutes(LAUNCH_TIMEOUT_MINUTE))
+ .register(registry);
+ hostTimers.put(host.getHost_id(), timer.start(Instant.ofEpochMilli(host.getCreate_date())));
+ }
+ }
+ }
+
+ /*
+ * Clean up timers for hosts that have been initializing for too long
+ */
+ private void cleanUpTimers() {
+ for (String hostId : hostTimers.keySet()) {
+ LongTaskTimer.Sample sample = hostTimers.get(hostId);
+ if (sample.duration(TimeUnit.MINUTES) > (double) MAX_TRACK_DURATION_MINUTE) {
+ sample.stop();
+ hostTimers.remove(hostId);
+ errorBudgetFailure.increment();
+ LOG.info("Removed timer for host {} after max tracking duration", hostId);
+ }
}
- return 0;
}
}
diff --git a/deploy-service/teletraanservice/src/test/java/com/pinterest/teletraan/worker/MetricsEmitterTest.java b/deploy-service/teletraanservice/src/test/java/com/pinterest/teletraan/worker/MetricsEmitterTest.java
index f9cbb2f213..244c37e425 100644
--- a/deploy-service/teletraanservice/src/test/java/com/pinterest/teletraan/worker/MetricsEmitterTest.java
+++ b/deploy-service/teletraanservice/src/test/java/com/pinterest/teletraan/worker/MetricsEmitterTest.java
@@ -1,10 +1,20 @@
package com.pinterest.teletraan.worker;
+import static com.pinterest.deployservice.bean.BeanUtils.createHostBean;
+import static com.pinterest.teletraan.universal.metrics.ErrorBudgetCounterFactory.ERROR_BUDGET_METRIC_NAME;
+import static com.pinterest.teletraan.universal.metrics.ErrorBudgetCounterFactory.ERROR_BUDGET_TAG_VALUE_RESPONSE_TYPE_SUCCESS;
+import static com.pinterest.teletraan.universal.metrics.ErrorBudgetCounterFactory.ERROR_BUDGET_TAG_VALUE_RESPONSE_TYPE_FAILURE;
+import static com.pinterest.teletraan.universal.metrics.ErrorBudgetCounterFactory.ERROR_BUDGET_TAG_NAME_RESPONSE_TYPE;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
import org.junit.After;
import org.junit.Before;
@@ -12,16 +22,27 @@
import org.junit.Test;
import com.pinterest.deployservice.ServiceContext;
+import com.pinterest.deployservice.bean.HostBean;
import com.pinterest.deployservice.dao.DeployDAO;
import com.pinterest.deployservice.dao.HostAgentDAO;
-
+import com.pinterest.deployservice.dao.HostDAO;
+import com.pinterest.teletraan.universal.metrics.micrometer.PinStatsConfig;
+import com.pinterest.teletraan.universal.metrics.micrometer.PinStatsMeterRegistry;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.LongTaskTimer;
+import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
public class MetricsEmitterTest {
private HostAgentDAO hostAgentDAO;
+ private HostDAO hostDAO;
private DeployDAO deployDAO;
+ private ServiceContext serviceContext;
@BeforeClass
public static void setUpClass() {
@@ -30,8 +51,14 @@ public static void setUpClass() {
@Before
public void setUp() {
+ hostDAO = mock(HostDAO.class);
hostAgentDAO = mock(HostAgentDAO.class);
deployDAO = mock(DeployDAO.class);
+
+ serviceContext = new ServiceContext();
+ serviceContext.setHostAgentDAO(hostAgentDAO);
+ serviceContext.setDeployDAO(deployDAO);
+ serviceContext.setHostDAO(hostDAO);
}
@After
@@ -65,25 +92,105 @@ public void testReportRunningDeployCount() throws SQLException {
@Test
public void testMetricsEmitter() throws SQLException {
- ServiceContext serviceContext = new ServiceContext();
- serviceContext.setHostAgentDAO(hostAgentDAO);
- serviceContext.setDeployDAO(deployDAO);
+ new MetricsEmitter(serviceContext);
- MetricsEmitter sut = new MetricsEmitter(serviceContext);
+ Gauge deploysTotal = Metrics.globalRegistry.get(MetricsEmitter.DEPLOYS_TODAY_TOTAL).gauge();
+ Gauge deploysRunning = Metrics.globalRegistry.get(MetricsEmitter.DEPLOYS_RUNNING_TOTAL).gauge();
when(hostAgentDAO.getDistinctHostsCount()).thenReturn(2L);
assertEquals(2, Metrics.globalRegistry.get(MetricsEmitter.HOSTS_TOTAL).gauge().value(), 0.01);
when(deployDAO.getDailyDeployCount()).thenReturn(1L);
- assertEquals(1, Metrics.globalRegistry.get(MetricsEmitter.DEPLOYS_TODAY_TOTAL).gauge().value(), 0.01);
+ assertEquals(1, deploysTotal.value(), 0.01);
when(deployDAO.getDailyDeployCount()).thenReturn(5L);
- assertEquals(5, Metrics.globalRegistry.get(MetricsEmitter.DEPLOYS_TODAY_TOTAL).gauge().value(), 0.01);
+ assertEquals(5, deploysTotal.value(), 0.01);
when(deployDAO.getRunningDeployCount()).thenReturn(3L);
- assertEquals(3, Metrics.globalRegistry.get(MetricsEmitter.DEPLOYS_RUNNING_TOTAL).gauge().value(), 0.01);
+ assertEquals(3, deploysRunning.value(), 0.01);
when(deployDAO.getRunningDeployCount()).thenReturn(2L);
- assertEquals(2, Metrics.globalRegistry.get(MetricsEmitter.DEPLOYS_RUNNING_TOTAL).gauge().value(), 0.01);
+ assertEquals(2, deploysRunning.value(), 0.01);
+ }
+
+ @Test
+ public void testEmitLaunchingMetrics() throws SQLException {
+ MockClock clock = new MockClock();
+ MeterRegistry registry = new PinStatsMeterRegistry(PinStatsConfig.DEFAULT, clock);
+ Metrics.globalRegistry.add(registry);
+ MetricsEmitter sut = new MetricsEmitter(serviceContext, clock);
+
+ long t1 = clock.wallTime();
+ clock.add(Duration.ofMinutes(MetricsEmitter.LAUNCH_TIMEOUT_MINUTE + 1));
+ long t2 = clock.wallTime();
+ HostBean timeoutHost = createHostBean(Instant.ofEpochMilli(t1));
+ HostBean normalHost = createHostBean(Instant.ofEpochMilli(t2));
+ HostBean carryOverHost = createHostBean(Instant.ofEpochMilli(t2));
+ HostBean cleanedUpHost = createHostBean(Instant.ofEpochMilli(t2));
+
+ // T2
+ when(hostDAO.getAgentlessHosts(anyLong(), anyInt()))
+ .thenReturn(Arrays.asList(timeoutHost, normalHost, carryOverHost, cleanedUpHost));
+ sut.emitLaunchingMetrics();
+
+ LongTaskTimer timer = registry.get(String.format(MetricsEmitter.HOSTS_LAUNCHING, timeoutHost.getGroup_name()))
+ .longTaskTimer();
+ Counter successCounter = Metrics.globalRegistry.get(ERROR_BUDGET_METRIC_NAME)
+ .tag(ERROR_BUDGET_TAG_NAME_RESPONSE_TYPE, ERROR_BUDGET_TAG_VALUE_RESPONSE_TYPE_SUCCESS).counter();
+ Counter failureCounter = Metrics.globalRegistry.get(ERROR_BUDGET_METRIC_NAME)
+ .tag(ERROR_BUDGET_TAG_NAME_RESPONSE_TYPE, ERROR_BUDGET_TAG_VALUE_RESPONSE_TYPE_FAILURE).counter();
+ assertEquals(4, timer.activeTasks());
+ assertEquals(0, successCounter.count(), 0.01);
+ assertEquals(0, failureCounter.count(), 0.01);
+
+ // T2 + 1, normalHost launched
+ clock.add(Duration.ofMinutes(1));
+ when(hostDAO.getAgentlessHosts(anyLong(), anyInt()))
+ .thenReturn(Arrays.asList(timeoutHost, carryOverHost, cleanedUpHost));
+ sut.emitLaunchingMetrics();
+
+ assertEquals(3, timer.activeTasks());
+ assertEquals(1, successCounter.count(), 0.01);
+ assertEquals(0, failureCounter.count(), 0.01);
+
+ // T2 + 1 + LAUNCH_TIMEOUT_MINUTE, carryOverHost launched
+ clock.add(Duration.ofMinutes(MetricsEmitter.LAUNCH_TIMEOUT_MINUTE));
+ when(hostDAO.getAgentlessHosts(anyLong(), anyInt()))
+ .thenReturn(Arrays.asList(timeoutHost, cleanedUpHost));
+ sut.emitLaunchingMetrics();
+
+ assertEquals(2, timer.activeTasks());
+ assertEquals(1, successCounter.count(), 0.01);
+ assertEquals(1, failureCounter.count(), 0.01);
+
+ // T1 + 3 + MAX_TRACK_DURATION_MINUTE, timeoutHost no longer in the list
+ clock.add(Duration.ofMinutes(MetricsEmitter.MAX_TRACK_DURATION_MINUTE - 2 * MetricsEmitter.LAUNCH_TIMEOUT_MINUTE));
+ when(hostDAO.getAgentlessHosts(anyLong(), anyInt()))
+ .thenReturn(Arrays.asList(cleanedUpHost));
+ sut.emitLaunchingMetrics();
+
+ assertEquals(1, timer.activeTasks());
+ assertEquals(1, successCounter.count(), 0.01);
+ assertEquals(2, failureCounter.count(), 0.01);
+
+ // T2 + 2 + MAX_TRACK_DURATION_MINUTE, cleanedUpHost is cleaned up even if it
+ // appears in the list
+ clock.add(Duration.ofMinutes(MetricsEmitter.LAUNCH_TIMEOUT_MINUTE));
+ when(hostDAO.getAgentlessHosts(anyLong(), anyInt()))
+ .thenReturn(Arrays.asList(cleanedUpHost));
+ sut.emitLaunchingMetrics();
+
+ assertEquals(0, timer.activeTasks());
+ assertEquals(1, successCounter.count(), 0.01);
+ assertEquals(3, failureCounter.count(), 0.01);
+
+ // When cleanedUpHost is removed from the list, the metrics won't change again
+ when(hostDAO.getAgentlessHosts(anyLong(), anyInt()))
+ .thenReturn(Arrays.asList());
+ sut.emitLaunchingMetrics();
+
+ assertEquals(0, timer.activeTasks());
+ assertEquals(1, successCounter.count(), 0.01);
+ assertEquals(3, failureCounter.count(), 0.01);
}
}
diff --git a/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsLongTaskTimer.java b/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsLongTaskTimer.java
new file mode 100644
index 0000000000..14cc028ea6
--- /dev/null
+++ b/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsLongTaskTimer.java
@@ -0,0 +1,59 @@
+package com.pinterest.teletraan.universal.metrics.micrometer;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.LongTaskTimer;
+import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
+import io.micrometer.core.instrument.internal.CumulativeHistogramLongTaskTimer;
+
+/**
+ * This is a custom implementation of {@link LongTaskTimer}
+ * to support user supplied start time.
+ */
+public class PinStatsLongTaskTimer extends CumulativeHistogramLongTaskTimer {
+ private static final Logger LOG = LoggerFactory.getLogger(PinStatsLongTaskTimer.class);
+
+ private final Clock clock;
+
+ public PinStatsLongTaskTimer(Id id, Clock clock, TimeUnit baseTimeUnit,
+ DistributionStatisticConfig distributionStatisticConfig) {
+ super(id, clock, baseTimeUnit, distributionStatisticConfig);
+ this.clock = clock;
+ }
+
+ /**
+ * Start the timer with user supplied start time.
+ *
+ * This method can only provide an approximation of the start time. Therefore it
+ * is not suitable for high precision use cases. You shouldn't use a
+ * {@link LongTaskTimer} for tracking high precision durations anyways.
+ *
+ * If for any reason the start time cannot be set, the current time will be
+ * used.
+ *
+ * @param startTime start time
+ * @return the sample with specified start time
+ */
+ public Sample start(Instant startTime) {
+ Sample sample = start();
+ try {
+ long timeLapsed = clock.wallTime() - startTime.toEpochMilli();
+ long monotonicStartTime = clock.monotonicTime() - timeLapsed * 1000000;
+ // The class `SampleImpl` is not visible, so we have to use reflection to set
+ // the start time.
+ Class> sampleImplClass = sample.getClass();
+ Field field = sampleImplClass.getDeclaredField("startTime");
+ field.setAccessible(true);
+ field.set(sample, monotonicStartTime);
+ } catch (Exception e) {
+ LOG.error("Failed to set start time, use current time instead", e);
+ }
+ return sample;
+ }
+}
diff --git a/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsMeterRegistry.java b/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsMeterRegistry.java
index c98059d758..98b84b58f7 100644
--- a/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsMeterRegistry.java
+++ b/deploy-service/universal/src/main/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsMeterRegistry.java
@@ -1,7 +1,9 @@
package com.pinterest.teletraan.universal.metrics.micrometer;
import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.opentsdb.OpenTSDBMeterRegistry;
@@ -33,6 +35,11 @@ config, config().clock(), getBaseTimeUnit(), config().namingConvention())
: publisher;
}
+ @Override
+ protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig) {
+ return new PinStatsLongTaskTimer(id, clock, getBaseTimeUnit(), distributionStatisticConfig);
+ }
+
@Override
protected void publish() {
for (List batch : MeterPartition.partition(this, config.batchSize())) {
diff --git a/deploy-service/universal/src/test/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsLongTaskTimerTest.java b/deploy-service/universal/src/test/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsLongTaskTimerTest.java
new file mode 100644
index 0000000000..067501e74f
--- /dev/null
+++ b/deploy-service/universal/src/test/java/com/pinterest/teletraan/universal/metrics/micrometer/PinStatsLongTaskTimerTest.java
@@ -0,0 +1,57 @@
+package com.pinterest.teletraan.universal.metrics.micrometer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.micrometer.core.instrument.LongTaskTimer;
+import io.micrometer.core.instrument.LongTaskTimer.Sample;
+import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.MockClock;
+import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
+import io.micrometer.core.instrument.simple.SimpleConfig;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+
+public class PinStatsLongTaskTimerTest {
+ private PinStatsLongTaskTimer sut;
+ private MockClock clock;
+ private MeterRegistry registry;
+
+ @BeforeEach
+ void setUp() {
+ clock = new MockClock();
+ registry = new SimpleMeterRegistry(SimpleConfig.DEFAULT, clock) {
+ @Override
+ protected LongTaskTimer newLongTaskTimer(Meter.Id id,
+ DistributionStatisticConfig distributionStatisticConfig) {
+ return new PinStatsLongTaskTimer(id, clock, getBaseTimeUnit(), distributionStatisticConfig);
+ }
+ };
+ sut = (PinStatsLongTaskTimer) LongTaskTimer.builder("my.ltt").register(registry);
+ }
+
+ @Test
+ void withoutParameter_start_nowAsStartTime() {
+ Instant startTime = Instant.now();
+ clock.add(startTime.getEpochSecond(), TimeUnit.SECONDS);
+ Sample sample = sut.start();
+ clock.addSeconds(1);
+ assertEquals(1000, sample.duration(TimeUnit.MILLISECONDS), 0.1);
+ }
+
+ @Test
+ void withInput_start_inputAsStartTime() {
+ Instant startTime = Instant.now();
+ // Set the clock to 5 seconds after startTime
+ clock.add(startTime.plusSeconds(5).minusMillis(clock.wallTime()).toEpochMilli(), TimeUnit.MILLISECONDS);
+ Sample sample = sut.start(startTime);
+ // Add another 5 seconds to the clock
+ clock.addSeconds(5);
+ assertEquals(10 * 1000, sample.duration(TimeUnit.MILLISECONDS), 0.1);
+ }
+}