Skip to content

Implement agent reuse, toggled by a cluster profile config option #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ apply from: "https://raw.githubusercontent.com/gocd/gocd-plugin-gradle-task-help

gocdPlugin {
id = 'cd.go.contrib.elasticagent.kubernetes'
pluginVersion = '4.1.0'
pluginVersion = '4.2.0'
goCdVersion = '21.4.0'
name = 'Kubernetes Elastic Agent Plugin'
description = 'Kubernetes Based Elastic Agent Plugins for GoCD'
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/cd/go/contrib/elasticagent/AgentInstances.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import cd.go.contrib.elasticagent.executors.ServerPingRequestExecutor;
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;

import java.util.Optional;
import java.util.function.Function;

/**
* Plugin implementors should implement these methods to interface to your cloud.
Expand All @@ -36,7 +38,7 @@ public interface AgentInstances<T> {
* @param pluginRequest the plugin request object
* @param consoleLogAppender
*/
T create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception;
Optional<T> requestCreateAgent(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception;

/**
* This message is sent when the plugin needs to terminate the agent instance.
Expand Down Expand Up @@ -84,5 +86,16 @@ public interface AgentInstances<T> {
* @param agentId the elastic agent id
*/
T find(String agentId);

/**
* Atomically update the agent instance for the given <code>agentId</code>.
* <code>computeFn</code> is called with the current agent instance if it exists,
* or null if it doesn't exist. <code>computeFn</code> should return a new agent instance
* that represents its new state.
* @param agentId
* @param computeFn
* @return
*/
T updateAgent(String agentId, Function<T, T> computeFn);
}

181 changes: 137 additions & 44 deletions src/main/java/cd/go/contrib/elasticagent/KubernetesAgentInstances.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@

import cd.go.contrib.elasticagent.model.JobIdentifier;
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;
import cd.go.contrib.elasticagent.KubernetesInstance.AgentState;
import cd.go.contrib.elasticagent.utils.Util;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;

import java.net.SocketTimeoutException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.stream.Collectors;

import static cd.go.contrib.elasticagent.KubernetesPlugin.LOG;
import static java.text.MessageFormat.format;

public class KubernetesAgentInstances implements AgentInstances<KubernetesInstance> {
private final ConcurrentHashMap<String, KubernetesInstance> instances = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, KubernetesInstance> instances;
public Clock clock = Clock.DEFAULT;
final Semaphore semaphore = new Semaphore(0, true);

private KubernetesClientFactory factory;
private KubernetesInstanceFactory kubernetesInstanceFactory;
Expand All @@ -50,55 +50,127 @@ public KubernetesAgentInstances(KubernetesClientFactory factory) {
}

public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInstanceFactory kubernetesInstanceFactory) {
this(factory, kubernetesInstanceFactory, Collections.emptyMap());
}

public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInstanceFactory kubernetesInstanceFactory, Map<String, KubernetesInstance> initialInstances) {
this.factory = factory;
this.kubernetesInstanceFactory = kubernetesInstanceFactory;
this.instances = new ConcurrentHashMap<>(initialInstances);
}

@Override
public KubernetesInstance create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
final Integer maxAllowedContainers = settings.getMaxPendingPods();
public Optional<KubernetesInstance> requestCreateAgent(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
final Integer maxAllowedPods = settings.getMaxPendingPods();
synchronized (instances) {
refreshAll(settings);
doWithLockOnSemaphore(new SetupSemaphore(maxAllowedContainers, instances, semaphore));
consoleLogAppender.accept("Waiting to create agent pod.");
if (semaphore.tryAcquire()) {
return createKubernetesInstance(request, settings, pluginRequest, consoleLogAppender);
if (instances.size() < maxAllowedPods) {
return requestCreateAgentHelper(request, settings, pluginRequest, consoleLogAppender);
} else {
String message = format("[Create Agent Request] The number of pending kubernetes pods is currently at the maximum permissible limit ({0}). Total kubernetes pods ({1}). Not creating any more containers.", maxAllowedContainers, instances.size());
String message = String.format("[Create Agent Request] The number of pending kubernetes pods is currently at the maximum permissible limit (%s). Total kubernetes pods (%s). Not creating any more pods.",
maxAllowedPods,
instances.size());
LOG.warn(message);
consoleLogAppender.accept(message);
return null;
return Optional.empty();
}
}
}

private void doWithLockOnSemaphore(Runnable runnable) {
synchronized (semaphore) {
runnable.run();
private List<KubernetesInstance> findPodsEligibleForReuse(CreateAgentRequest request) {
Long jobId = request.jobIdentifier().getJobId();
String jobElasticConfigHash = KubernetesInstanceFactory.agentConfigHash(
request.clusterProfileProperties(), request.elasticProfileProperties());

List<KubernetesInstance> eligiblePods = new ArrayList<>();

for (KubernetesInstance instance : instances.values()) {
if (instance.getJobId().equals(jobId)) {
eligiblePods.add(instance);
continue;
}

String podElasticConfigHash = instance.getPodAnnotations().get(KubernetesInstance.ELASTIC_CONFIG_HASH);
boolean sameElasticConfig = Objects.equals(podElasticConfigHash, jobElasticConfigHash);
boolean instanceIsIdle = instance.getAgentState().equals(KubernetesInstance.AgentState.Idle);
boolean podIsRunning = instance.getPodState().equals(PodState.Running);
boolean isReusable = sameElasticConfig && instanceIsIdle && podIsRunning;

LOG.info(
"[reuse] Is pod {} reusable for job {}? {}. Job has {}={}; pod has {}={}, agentState={}, podState={}",
instance.getPodName(),
jobId,
isReusable,
KubernetesInstance.ELASTIC_CONFIG_HASH,
jobElasticConfigHash,
KubernetesInstance.ELASTIC_CONFIG_HASH,
podElasticConfigHash,
instance.getAgentState(),
instance.getPodState()
);

if (isReusable) {
eligiblePods.add(instance);
}
}

return eligiblePods;
}

private KubernetesInstance createKubernetesInstance(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {

private Optional<KubernetesInstance> requestCreateAgentHelper(
CreateAgentRequest request,
PluginSettings settings,
PluginRequest pluginRequest,
ConsoleLogAppender consoleLogAppender) {
JobIdentifier jobIdentifier = request.jobIdentifier();
if (isAgentCreatedForJob(jobIdentifier.getJobId())) {
String message = format("[Create Agent Request] Request for creating an agent for Job Identifier [{0}] has already been scheduled. Skipping current request.", jobIdentifier);
LOG.warn(message);
consoleLogAppender.accept(message);
return null;
Long jobId = jobIdentifier.getJobId();

// Agent reuse disabled - create a new pod only if one hasn't already been created for this job ID.
if (!settings.getEnableAgentReuse()) {
// Already created a pod for this job ID.
if (isAgentCreatedForJob(jobId)) {
String message = format("[Create Agent Request] Request for creating an agent for Job Identifier [{0}] has already been scheduled. Skipping current request.", jobIdentifier);
LOG.warn(message);
consoleLogAppender.accept(message);
return Optional.empty();
}
// No pod created yet for this job ID. Create one.
KubernetesClient client = factory.client(settings);
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
consoleLogAppender.accept(String.format("Created pod: %s", instance.getPodName()));
instance = instance.toBuilder().agentState(AgentState.Building).build();
register(instance);
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.getPodName()));
return Optional.of(instance);
}

KubernetesClient client = factory.client(settings);
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
consoleLogAppender.accept(String.format("Creating pod: %s", instance.name()));
register(instance);
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.name()));
// Agent reuse enabled - look for any extant pods that match this job,
// and create a new one only if there are none.
List<KubernetesInstance> reusablePods = findPodsEligibleForReuse(request);
LOG.info("[reuse] Found {} pods eligible for reuse for CreateAgentRequest for job {}: {}",
reusablePods.size(),
jobId,
reusablePods.stream().map(pod -> pod.getPodName()).collect(Collectors.toList()));

return instance;
if (reusablePods.isEmpty()) {
KubernetesClient client = factory.client(settings);
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
consoleLogAppender.accept(String.format("Created pod: %s", instance.getPodName()));
instance = instance.toBuilder().agentState(AgentState.Building).build();
register(instance);
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.getPodName()));
return Optional.of(instance);
} else {
String message = String.format("[reuse] Not creating a new pod - found %s eligible for reuse.", reusablePods.size());
consoleLogAppender.accept(message);
LOG.info(message);
return Optional.empty();
}
}

private boolean isAgentCreatedForJob(Long jobId) {
for (KubernetesInstance instance : instances.values()) {
if (instance.jobId().equals(jobId)) {
if (instance.getJobId().equals(jobId)) {
return true;
}
}
Expand All @@ -111,7 +183,7 @@ public void terminate(String agentId, PluginSettings settings) {
KubernetesInstance instance = instances.get(agentId);
if (instance != null) {
KubernetesClient client = factory.client(settings);
instance.terminate(client);
client.pods().withName(instance.getPodName()).delete();
} else {
LOG.warn(format("Requested to terminate an instance that does not exist {0}.", agentId));
}
Expand Down Expand Up @@ -140,56 +212,77 @@ public Agents instancesCreatedAfterTimeout(PluginSettings settings, Agents agent
continue;
}

if (clock.now().isAfter(instance.createdAt().plus(settings.getAutoRegisterPeriod()))) {
if (clock.now().isAfter(instance.getCreatedAt().plus(settings.getAutoRegisterPeriod()))) {
oldAgents.add(agent);
}
}
return new Agents(oldAgents);
}

public List<Pod> listAgentPods(KubernetesClient client) {
if (client == null) {
throw new IllegalArgumentException("client is null");
}
return client.pods()
.withLabel(Constants.KUBERNETES_POD_KIND_LABEL_KEY, Constants.KUBERNETES_POD_KIND_LABEL_VALUE)
.list()
.getItems();
}

@Override
public void refreshAll(PluginSettings properties) {
LOG.debug("[Refresh Instances] Syncing k8s elastic agent pod information for cluster {}.", properties);
PodList list = null;
List<Pod> pods = null;
try {
KubernetesClient client = factory.client(properties);
list = client.pods().list();
pods = listAgentPods(client);
} catch (Exception e) {
LOG.error("Error occurred while trying to list kubernetes pods:", e);

if (e.getCause() instanceof SocketTimeoutException) {
LOG.error("Error caused due to SocketTimeoutException. This generally happens due to stale kubernetes client. Clearing out existing kubernetes client and creating a new one!");
factory.clearOutExistingClient();
KubernetesClient client = factory.client(properties);
list = client.pods().list();
pods = listAgentPods(client);
}
}

if (list == null) {
if (pods == null) {
LOG.info("Did not find any running kubernetes pods.");
return;
}

Map<String, KubernetesInstance> oldInstances = Map.copyOf(instances);
instances.clear();
for (Pod pod : list.getItems()) {
Map<String, String> podLabels = pod.getMetadata().getLabels();
if (podLabels != null) {
if (Constants.KUBERNETES_POD_KIND_LABEL_VALUE.equals(podLabels.get(Constants.KUBERNETES_POD_KIND_LABEL_KEY))) {
register(kubernetesInstanceFactory.fromKubernetesPod(pod));
}

for (Pod pod : pods) {
String podName = pod.getMetadata().getName();
// preserve pod's agent state
KubernetesInstance newInstance = kubernetesInstanceFactory.fromKubernetesPod(pod);
KubernetesInstance oldInstance = oldInstances.get(podName);
if (oldInstance != null) {
AgentState oldAgentState = oldInstances.get(podName).getAgentState();
newInstance = newInstance.toBuilder().agentState(oldAgentState).build();
LOG.debug("[reuse] Preserved AgentState {} upon refresh of pod {}", oldAgentState, podName);
}
register(newInstance);
}

LOG.info(String.format("[refresh-pod-state] Pod information successfully synced. All(Running/Pending) pod count is %d.", instances.size()));
}

@Override
public KubernetesInstance updateAgent(String agentId, Function<KubernetesInstance, KubernetesInstance> updateFn) {
return instances.compute(agentId, (_agentId, instance) -> updateFn.apply(instance));
}

@Override
public KubernetesInstance find(String agentId) {
return instances.get(agentId);
}

public void register(KubernetesInstance instance) {
instances.put(instance.name(), instance);
instances.put(instance.getPodName(), instance);
}

private KubernetesAgentInstances unregisteredAfterTimeout(PluginSettings settings, Agents knownAgents) throws Exception {
Expand Down
Loading