From a5a9ee9abd34d6a0e5696d08c7497bf50811e2f4 Mon Sep 17 00:00:00 2001 From: Matteo Saloni Date: Wed, 5 Jun 2024 17:05:46 +0200 Subject: [PATCH] feat: python serve task --- .../k8s/K8sDeploymentFramework.java | 136 +++++++++++++- .../infrastructure/k8s/K8sJobFramework.java | 28 ++- .../infrastructure/k8s/K8sServeFramework.java | 80 ++++++++ .../k8s/runnables/K8sDeploymentRunnable.java | 10 + .../k8s/runnables/K8sServeRunnable.java | 9 + .../runtime/python/PythonRuntime.java | 50 ++++- .../python/builders/PythonServeBuilder.java | 24 +++ .../python/model/NuclioFunctionBuilder.java | 20 +- .../python/model/NuclioFunctionSpec.java | 2 + .../python/runners/PythonJobRunner.java | 2 +- .../python/runners/PythonServeRunner.java | 172 ++++++++++++++++++ .../python/specs/run/PythonRunSpec.java | 9 + .../specs/task/PythonServeTaskSpec.java | 47 +++++ .../task/PythonServeTaskSpecFactory.java | 20 ++ 14 files changed, 593 insertions(+), 16 deletions(-) create mode 100644 modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/builders/PythonServeBuilder.java create mode 100644 modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonServeRunner.java create mode 100644 modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpec.java create mode 100644 modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpecFactory.java diff --git a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sDeploymentFramework.java b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sDeploymentFramework.java index 1b1cff4af..e19f3ffe0 100644 --- a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sDeploymentFramework.java +++ b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sDeploymentFramework.java @@ -4,6 +4,7 @@ import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.models.V1ConfigMap; import io.kubernetes.client.openapi.models.V1Container; import io.kubernetes.client.openapi.models.V1Deployment; import io.kubernetes.client.openapi.models.V1DeploymentSpec; @@ -19,16 +20,25 @@ import io.kubernetes.client.openapi.models.V1VolumeMount; import it.smartcommunitylabdhub.commons.annotations.infrastructure.FrameworkComponent; import it.smartcommunitylabdhub.commons.models.enums.State; +import it.smartcommunitylabdhub.commons.utils.MapUtils; import it.smartcommunitylabdhub.framework.k8s.exceptions.K8sFrameworkException; +import it.smartcommunitylabdhub.framework.k8s.model.ContextRef; +import it.smartcommunitylabdhub.framework.k8s.model.ContextSource; +import it.smartcommunitylabdhub.framework.k8s.objects.CoreVolume; import it.smartcommunitylabdhub.framework.k8s.runnables.K8sDeploymentRunnable; import jakarta.validation.constraints.NotNull; import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.util.Assert; @Slf4j @@ -42,6 +52,9 @@ public class K8sDeploymentFramework extends K8sBaseFramework() {}; private final AppsV1Api appsV1Api; + @Value("${kaniko.init-image}") + private String initImage; + public K8sDeploymentFramework(ApiClient apiClient) { super(apiClient); appsV1Api = new AppsV1Api(apiClient); @@ -55,12 +68,67 @@ public K8sDeploymentRunnable run(K8sDeploymentRunnable runnable) throws K8sFrame } V1Deployment deployment = build(runnable); + //secrets V1Secret secret = buildRunSecret(runnable); if (secret != null) { storeRunSecret(secret); } + //check context refs and build config + if (runnable.getContextRefs() != null || runnable.getContextSources() != null) { + //build and create configMap + //TODO move to shared method + try { + // Generate Config map + Optional> contextRefsOpt = Optional.ofNullable(runnable.getContextRefs()); + Optional> contextSourcesOpt = Optional.ofNullable(runnable.getContextSources()); + V1ConfigMap configMap = new V1ConfigMap() + .metadata( + new V1ObjectMeta().name("init-config-map-" + runnable.getId()).labels(buildLabels(runnable)) + ) + .data( + MapUtils.mergeMultipleMaps( + // Generate context-refs.txt if exist + contextRefsOpt + .map(contextRefsList -> + Map.of( + "context-refs.txt", + contextRefsList + .stream() + .map(v -> + v.getProtocol() + "," + v.getDestination() + "," + v.getSource() + "\n" + ) + .collect(Collectors.joining("")) + ) + ) + .orElseGet(Map::of), + // Generate context-sources.txt if exist + contextSourcesOpt + .map(contextSources -> + contextSources + .stream() + .collect( + Collectors.toMap( + ContextSource::getName, + c -> + new String( + Base64.getUrlDecoder().decode(c.getBase64()), + StandardCharsets.UTF_8 + ) + ) + ) + ) + .orElseGet(Map::of) + ) + ); + + coreV1Api.createNamespacedConfigMap(namespace, configMap, null, null, null, null); + } catch (ApiException | NullPointerException e) { + throw new K8sFrameworkException(e.getMessage()); + } + } + log.info("create deployment for {}", String.valueOf(deployment.getMetadata().getName())); deployment = create(deployment); @@ -95,12 +163,24 @@ public K8sDeploymentRunnable delete(K8sDeploymentRunnable runnable) throws K8sFr runnable.setState(State.DELETED.name()); return runnable; } - //secrets - cleanRunSecret(runnable); log.info("delete deployment for {}", String.valueOf(deployment.getMetadata().getName())); delete(deployment); + //secrets + cleanRunSecret(runnable); + + //init config map + try { + String configMapName = "init-config-map-" + runnable.getId(); + V1ConfigMap initConfigMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace, null); + if (initConfigMap != null) { + coreV1Api.deleteNamespacedConfigMap(configMapName, namespace, null, null, null, null, null, null); + } + } catch (ApiException | NullPointerException e) { + //ignore, not existing or error + } + //update results try { runnable.setResults(Map.of("deployment", mapper.convertValue(deployment, typeRef))); @@ -190,6 +270,41 @@ public V1Deployment build(K8sDeploymentRunnable runnable) throws K8sFrameworkExc List command = buildCommand(runnable); List args = buildArgs(runnable); + //check if context build is required + if (runnable.getContextRefs() != null || runnable.getContextSources() != null) { + // Create sharedVolume + CoreVolume sharedVolume = new CoreVolume( + CoreVolume.VolumeType.empty_dir, + "/shared", + "shared-dir", + Map.of("sizeLimit", "100Mi") + ); + + // Create config map volume + CoreVolume configMapVolume = new CoreVolume( + CoreVolume.VolumeType.config_map, + "/init-config-map", + "init-config-map", + Map.of("name", "init-config-map-" + runnable.getId()) + ); + + List initVolumes = List.of( + k8sBuilderHelper.getVolume(sharedVolume), + k8sBuilderHelper.getVolume(configMapVolume) + ); + List initVolumesMounts = List.of( + k8sBuilderHelper.getVolumeMount(sharedVolume), + k8sBuilderHelper.getVolumeMount(configMapVolume) + ); + + //add volume + volumes = Stream.concat(buildVolumes(runnable).stream(), initVolumes.stream()).collect(Collectors.toList()); + volumeMounts = + Stream + .concat(buildVolumeMounts(runnable).stream(), initVolumesMounts.stream()) + .collect(Collectors.toList()); + } + // Build Container V1Container container = new V1Container() .name(containerName) @@ -213,6 +328,23 @@ public V1Deployment build(K8sDeploymentRunnable runnable) throws K8sFrameworkExc .restartPolicy("Always") .imagePullSecrets(buildImagePullSecrets(runnable)); + //check if context build is required + if (runnable.getContextRefs() != null || runnable.getContextSources() != null) { + // Add Init container to the PodTemplateSpec + // Build the Init Container + V1Container initContainer = new V1Container() + .name("init-container-" + runnable.getId()) + .image(initImage) + .volumeMounts(volumeMounts) + .resources(resources) + .env(env) + .envFrom(envFrom) + //TODO below execute a command that is a Go script + .command(List.of("/bin/bash", "-c", "/app/builder-tool.sh")); + + podSpec.setInitContainers(Collections.singletonList(initContainer)); + } + // Create a PodTemplateSpec with the PodSpec V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec().metadata(metadata).spec(podSpec); diff --git a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sJobFramework.java b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sJobFramework.java index 78dd104e2..5c2db20b9 100644 --- a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sJobFramework.java +++ b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sJobFramework.java @@ -174,6 +174,19 @@ public K8sJobRunnable stop(K8sJobRunnable runnable) throws K8sFrameworkException //stop by deleting log.info("delete job for {}", String.valueOf(job.getMetadata().getName())); delete(job); + //secrets + cleanRunSecret(runnable); + + //init config map + try { + String configMapName = "init-config-map-" + runnable.getId(); + V1ConfigMap initConfigMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace, null); + if (initConfigMap != null) { + coreV1Api.deleteNamespacedConfigMap(configMapName, namespace, null, null, null, null, null, null); + } + } catch (ApiException | NullPointerException e) { + //ignore, not existing or error + } //update state runnable.setState(State.STOPPED.name()); @@ -200,11 +213,22 @@ public K8sJobRunnable delete(K8sJobRunnable runnable) throws K8sFrameworkExcepti return runnable; } + log.info("delete job for {}", String.valueOf(job.getMetadata().getName())); + delete(job); + //secrets cleanRunSecret(runnable); - log.info("delete job for {}", String.valueOf(job.getMetadata().getName())); - delete(job); + //init config map + try { + String configMapName = "init-config-map-" + runnable.getId(); + V1ConfigMap initConfigMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace, null); + if (initConfigMap != null) { + coreV1Api.deleteNamespacedConfigMap(configMapName, namespace, null, null, null, null, null, null); + } + } catch (ApiException | NullPointerException e) { + //ignore, not existing or error + } //update state runnable.setState(State.DELETED.name()); diff --git a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sServeFramework.java b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sServeFramework.java index 13a21ccd1..b79583f5c 100644 --- a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sServeFramework.java +++ b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/infrastructure/k8s/K8sServeFramework.java @@ -4,6 +4,7 @@ import io.kubernetes.client.custom.IntOrString; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.V1ConfigMap; import io.kubernetes.client.openapi.models.V1Deployment; import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1Service; @@ -11,11 +12,16 @@ import io.kubernetes.client.openapi.models.V1ServiceSpec; import it.smartcommunitylabdhub.commons.annotations.infrastructure.FrameworkComponent; import it.smartcommunitylabdhub.commons.models.enums.State; +import it.smartcommunitylabdhub.commons.utils.MapUtils; import it.smartcommunitylabdhub.framework.k8s.exceptions.K8sFrameworkException; +import it.smartcommunitylabdhub.framework.k8s.model.ContextRef; +import it.smartcommunitylabdhub.framework.k8s.model.ContextSource; import it.smartcommunitylabdhub.framework.k8s.runnables.K8sDeploymentRunnable; import it.smartcommunitylabdhub.framework.k8s.runnables.K8sServeRunnable; import jakarta.validation.constraints.NotNull; import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -23,6 +29,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.util.Assert; @Slf4j @@ -34,6 +41,9 @@ public class K8sServeFramework extends K8sBaseFramework >() {}; + @Value("${kaniko.init-image}") + private String initImage; + @Autowired private K8sDeploymentFramework deploymentFramework; @@ -51,6 +61,60 @@ public K8sServeRunnable run(K8sServeRunnable runnable) throws K8sFrameworkExcept // Create a deployment from a Deployment+Service V1Deployment deployment = buildDeployment(runnable); + //check context refs and build config + if (runnable.getContextRefs() != null || runnable.getContextSources() != null) { + //build and create configMap + //TODO move to shared method + try { + // Generate Config map + Optional> contextRefsOpt = Optional.ofNullable(runnable.getContextRefs()); + Optional> contextSourcesOpt = Optional.ofNullable(runnable.getContextSources()); + V1ConfigMap configMap = new V1ConfigMap() + .metadata( + new V1ObjectMeta().name("init-config-map-" + runnable.getId()).labels(buildLabels(runnable)) + ) + .data( + MapUtils.mergeMultipleMaps( + // Generate context-refs.txt if exist + contextRefsOpt + .map(contextRefsList -> + Map.of( + "context-refs.txt", + contextRefsList + .stream() + .map(v -> + v.getProtocol() + "," + v.getDestination() + "," + v.getSource() + "\n" + ) + .collect(Collectors.joining("")) + ) + ) + .orElseGet(Map::of), + // Generate context-sources.txt if exist + contextSourcesOpt + .map(contextSources -> + contextSources + .stream() + .collect( + Collectors.toMap( + ContextSource::getName, + c -> + new String( + Base64.getUrlDecoder().decode(c.getBase64()), + StandardCharsets.UTF_8 + ) + ) + ) + ) + .orElseGet(Map::of) + ) + ); + + coreV1Api.createNamespacedConfigMap(namespace, configMap, null, null, null, null); + } catch (ApiException | NullPointerException e) { + throw new K8sFrameworkException(e.getMessage()); + } + } + log.info("create deployment for {}", String.valueOf(deployment.getMetadata().getName())); deployment = deploymentFramework.create(deployment); @@ -106,6 +170,20 @@ public K8sServeRunnable delete(K8sServeRunnable runnable) throws K8sFrameworkExc deploymentFramework.delete(deployment); } + //secrets + cleanRunSecret(runnable); + + //init config map + try { + String configMapName = "init-config-map-" + runnable.getId(); + V1ConfigMap initConfigMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace, null); + if (initConfigMap != null) { + coreV1Api.deleteNamespacedConfigMap(configMapName, namespace, null, null, null, null, null, null); + } + } catch (ApiException | NullPointerException e) { + //ignore, not existing or error + } + V1Service service; try { // Retrieve the service @@ -352,6 +430,8 @@ private K8sDeploymentRunnable getDeployment(K8sServeRunnable runnable) { .state(runnable.getState()) .tolerations(runnable.getTolerations()) .volumes(runnable.getVolumes()) + .contextRefs(runnable.getContextRefs()) + .contextSources(runnable.getContextSources()) .build(); } } diff --git a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sDeploymentRunnable.java b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sDeploymentRunnable.java index c62ba2232..65816ceb9 100644 --- a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sDeploymentRunnable.java +++ b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sDeploymentRunnable.java @@ -1,7 +1,11 @@ package it.smartcommunitylabdhub.framework.k8s.runnables; +import com.fasterxml.jackson.annotation.JsonProperty; import it.smartcommunitylabdhub.commons.annotations.infrastructure.RunnableComponent; import it.smartcommunitylabdhub.framework.k8s.infrastructure.k8s.K8sDeploymentFramework; +import it.smartcommunitylabdhub.framework.k8s.model.ContextRef; +import it.smartcommunitylabdhub.framework.k8s.model.ContextSource; +import java.util.List; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -16,6 +20,12 @@ @AllArgsConstructor public class K8sDeploymentRunnable extends K8sRunnable { + @JsonProperty("context_refs") + private List contextRefs; + + @JsonProperty("context_sources") + private List contextSources; + private Integer replicas; @Override diff --git a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sServeRunnable.java b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sServeRunnable.java index b486b43e5..36a61a8e7 100644 --- a/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sServeRunnable.java +++ b/modules/framework-k8s/src/main/java/it/smartcommunitylabdhub/framework/k8s/runnables/K8sServeRunnable.java @@ -1,7 +1,10 @@ package it.smartcommunitylabdhub.framework.k8s.runnables; +import com.fasterxml.jackson.annotation.JsonProperty; import it.smartcommunitylabdhub.commons.annotations.infrastructure.RunnableComponent; import it.smartcommunitylabdhub.framework.k8s.infrastructure.k8s.K8sServeFramework; +import it.smartcommunitylabdhub.framework.k8s.model.ContextRef; +import it.smartcommunitylabdhub.framework.k8s.model.ContextSource; import it.smartcommunitylabdhub.framework.k8s.objects.CorePort; import it.smartcommunitylabdhub.framework.k8s.objects.CoreServiceType; import java.util.List; @@ -19,6 +22,12 @@ @NoArgsConstructor public class K8sServeRunnable extends K8sRunnable { + @JsonProperty("context_refs") + private List contextRefs; + + @JsonProperty("context_sources") + private List contextSources; + private List servicePorts; private CoreServiceType serviceType; diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/PythonRuntime.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/PythonRuntime.java index 9e0802276..724cf9100 100644 --- a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/PythonRuntime.java +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/PythonRuntime.java @@ -21,16 +21,21 @@ import it.smartcommunitylabdhub.commons.services.entities.FunctionService; import it.smartcommunitylabdhub.commons.services.entities.SecretService; import it.smartcommunitylabdhub.framework.k8s.infrastructure.k8s.K8sJobFramework; +import it.smartcommunitylabdhub.framework.k8s.infrastructure.k8s.K8sServeFramework; import it.smartcommunitylabdhub.framework.k8s.model.K8sLogStatus; import it.smartcommunitylabdhub.framework.k8s.objects.CoreLog; import it.smartcommunitylabdhub.framework.k8s.objects.CoreMetric; import it.smartcommunitylabdhub.framework.k8s.runnables.K8sJobRunnable; import it.smartcommunitylabdhub.framework.k8s.runnables.K8sRunnable; +import it.smartcommunitylabdhub.framework.k8s.runnables.K8sServeRunnable; import it.smartcommunitylabdhub.runtime.python.builders.PythonJobBuilder; +import it.smartcommunitylabdhub.runtime.python.builders.PythonServeBuilder; import it.smartcommunitylabdhub.runtime.python.runners.PythonJobRunner; +import it.smartcommunitylabdhub.runtime.python.runners.PythonServeRunner; import it.smartcommunitylabdhub.runtime.python.specs.function.PythonFunctionSpec; import it.smartcommunitylabdhub.runtime.python.specs.run.PythonRunSpec; import it.smartcommunitylabdhub.runtime.python.specs.task.PythonJobTaskSpec; +import it.smartcommunitylabdhub.runtime.python.specs.task.PythonServeTaskSpec; import it.smartcommunitylabdhub.runtime.python.status.PythonRunStatus; import jakarta.validation.constraints.NotNull; import java.io.Serializable; @@ -57,6 +62,7 @@ public class PythonRuntime implements Runtime jobRunnableStore; + @Autowired(required = false) + private RunnableStore serveRunnableStore; + @Autowired private FunctionService functionService; @@ -96,6 +105,10 @@ public PythonRunSpec build(@NotNull Executable function, @NotNull Task task, @No PythonJobTaskSpec taskJobSpec = new PythonJobTaskSpec(task.getSpec()); return jobBuilder.build(funSpec, taskJobSpec, runSpec); } + case PythonServeTaskSpec.KIND -> { + PythonServeTaskSpec taskServeSpec = new PythonServeTaskSpec(task.getSpec()); + return serveBuilder.build(funSpec, taskServeSpec, runSpec); + } default -> throw new IllegalArgumentException( "Kind not recognized. Cannot retrieve the right builder or specialize Spec for Run and Task." ); @@ -124,6 +137,13 @@ public RunRunnable run(@NotNull Run run) { secretService.groupSecrets(run.getProject(), runPythonSpec.getTaskJobSpec().getSecrets()) ) .produce(run); + case PythonServeTaskSpec.KIND -> new PythonServeRunner( + image, + command, + runPythonSpec.getFunctionSpec(), + secretService.groupSecrets(run.getProject(), runPythonSpec.getTaskJobSpec().getSecrets()) + ) + .produce(run); default -> throw new IllegalArgumentException("Kind not recognized. Cannot retrieve the right Runner"); }; } @@ -148,12 +168,23 @@ public RunRunnable stop(Run run) throws NoSuchEntityException { if (jobRunnableStore != null) { K8sJobRunnable k8sJobRunnable = jobRunnableStore.find(run.getId()); if (k8sJobRunnable == null) { - throw new NoSuchEntityException("JobDeployment not found"); + throw new NoSuchEntityException("Run not found"); } k8sJobRunnable.setState(State.STOP.name()); yield k8sJobRunnable; } - throw new CoreRuntimeException("Job Store is not available"); + throw new CoreRuntimeException("Store is not available"); + } + case PythonServeTaskSpec.KIND -> { + if (serveRunnableStore != null) { + K8sServeRunnable k8sServeRunnable = serveRunnableStore.find(run.getId()); + if (k8sServeRunnable == null) { + throw new NoSuchEntityException("Run not found"); + } + k8sServeRunnable.setState(State.STOP.name()); + yield k8sServeRunnable; + } + throw new CoreRuntimeException("Store is not available"); } default -> throw new IllegalArgumentException("Kind not recognized. Cannot retrieve the right Runner"); }; @@ -191,6 +222,18 @@ public RunRunnable delete(Run run) throws NoSuchEntityException { } throw new CoreRuntimeException("Job Store is not available"); } + case PythonServeTaskSpec.KIND -> { + if (jobRunnableStore != null) { + K8sServeRunnable k8sServeRunnable = serveRunnableStore.find(run.getId()); + if (k8sServeRunnable == null) { + //not in store, either not existent or already removed, nothing to do + yield null; + } + k8sServeRunnable.setState(State.DELETING.name()); + yield k8sServeRunnable; + } + throw new CoreRuntimeException("Job Store is not available"); + } default -> throw new IllegalArgumentException("Kind not recognized. Cannot retrieve the right Runner"); }; } catch (StoreException e) { @@ -484,6 +527,9 @@ private RunnableStore getStore(RunRunnable runnable) { case K8sJobFramework.FRAMEWORK -> { yield jobRunnableStore; } + case K8sServeFramework.FRAMEWORK -> { + yield serveRunnableStore; + } default -> null; }; } diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/builders/PythonServeBuilder.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/builders/PythonServeBuilder.java new file mode 100644 index 000000000..e64f5465d --- /dev/null +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/builders/PythonServeBuilder.java @@ -0,0 +1,24 @@ +package it.smartcommunitylabdhub.runtime.python.builders; + +import it.smartcommunitylabdhub.commons.infrastructure.Builder; +import it.smartcommunitylabdhub.runtime.python.specs.function.PythonFunctionSpec; +import it.smartcommunitylabdhub.runtime.python.specs.run.PythonRunSpec; +import it.smartcommunitylabdhub.runtime.python.specs.task.PythonServeTaskSpec; +import java.util.Optional; + +public class PythonServeBuilder implements Builder { + + @Override + public PythonRunSpec build(PythonFunctionSpec funSpec, PythonServeTaskSpec taskSpec, PythonRunSpec runSpec) { + PythonRunSpec pythonSpec = new PythonRunSpec(runSpec.toMap()); + pythonSpec.setTaskServeSpec(taskSpec); + pythonSpec.setFunctionSpec(funSpec); + + //let run override k8s specs + Optional + .ofNullable(runSpec.getTaskServeSpec()) + .ifPresent(k8sSpec -> pythonSpec.getTaskServeSpec().configure(k8sSpec.toMap())); + + return pythonSpec; + } +} diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionBuilder.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionBuilder.java index 87f2f5cc4..4846e9876 100644 --- a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionBuilder.java +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionBuilder.java @@ -29,16 +29,18 @@ public static HashMap build(NuclioFunctionSpec fn) { spec.put("minReplicas", fn.getMinReplicas() != null ? fn.getMinReplicas() : 1); spec.put("maxReplicas", fn.getMaxReplicas() != null ? fn.getMaxReplicas() : 1); - //build trigger - HashMap trigger = (HashMap) spec.getOrDefault( - "trigger", - new HashMap<>() - ); + HashMap triggers = fn.getTriggers() != null + ? new HashMap(fn.getTriggers()) + : new HashMap<>(); - HashMap attributes = new HashMap<>(Map.of("event", fn.getEvent())); - HashMap job = new HashMap<>(Map.of("kind", "job", "attributes", attributes)); - trigger.put("job", job); - spec.put("triggers", trigger); + //build default trigger if empty + if (triggers.isEmpty()) { + HashMap attributes = new HashMap<>(Map.of("event", fn.getEvent())); + HashMap job = new HashMap<>(Map.of("kind", "job", "attributes", attributes)); + triggers.put("job", job); + } + + spec.put("triggers", triggers); HashMap map = new HashMap<>(); map.put("apiVersion", "nuclio.io/v1beta1"); diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionSpec.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionSpec.java index a7687f810..3f32634de 100644 --- a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionSpec.java +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/model/NuclioFunctionSpec.java @@ -1,6 +1,7 @@ package it.smartcommunitylabdhub.runtime.python.model; import java.io.Serializable; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -19,6 +20,7 @@ public class NuclioFunctionSpec { private String runtime; private String handler; private Serializable event; + private Map triggers; private Integer minReplicas; private Integer maxReplicas; diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonJobRunner.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonJobRunner.java index cc8d48944..5094cc5d3 100644 --- a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonJobRunner.java +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonJobRunner.java @@ -75,7 +75,7 @@ public K8sRunnable produce(Run run) { .builder() .runtime("python") //invoke user code wrapped via default handler - .handler("run_handler:handler") + .handler("run_handler:handler_job") //directly invoke user code // .handler("main:" + runSpec.getFunctionSpec().getSource().getHandler()) .event(event) diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonServeRunner.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonServeRunner.java new file mode 100644 index 000000000..878a3ce07 --- /dev/null +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/runners/PythonServeRunner.java @@ -0,0 +1,172 @@ +package it.smartcommunitylabdhub.runtime.python.runners; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import it.smartcommunitylabdhub.commons.infrastructure.Runner; +import it.smartcommunitylabdhub.commons.jackson.JacksonMapper; +import it.smartcommunitylabdhub.commons.models.entities.run.Run; +import it.smartcommunitylabdhub.commons.models.enums.State; +import it.smartcommunitylabdhub.commons.models.objects.SourceCode; +import it.smartcommunitylabdhub.framework.k8s.model.ContextRef; +import it.smartcommunitylabdhub.framework.k8s.model.ContextSource; +import it.smartcommunitylabdhub.framework.k8s.objects.CoreEnv; +import it.smartcommunitylabdhub.framework.k8s.objects.CorePort; +import it.smartcommunitylabdhub.framework.k8s.runnables.K8sRunnable; +import it.smartcommunitylabdhub.framework.k8s.runnables.K8sServeRunnable; +import it.smartcommunitylabdhub.runtime.python.PythonRuntime; +import it.smartcommunitylabdhub.runtime.python.model.NuclioFunctionBuilder; +import it.smartcommunitylabdhub.runtime.python.model.NuclioFunctionSpec; +import it.smartcommunitylabdhub.runtime.python.specs.function.PythonFunctionSpec; +import it.smartcommunitylabdhub.runtime.python.specs.function.PythonFunctionSpec.PythonSourceCodeLanguages; +import it.smartcommunitylabdhub.runtime.python.specs.run.PythonRunSpec; +import it.smartcommunitylabdhub.runtime.python.specs.task.PythonServeTaskSpec; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.springframework.util.StringUtils; +import org.springframework.web.util.UriComponents; +import org.springframework.web.util.UriComponentsBuilder; + +public class PythonServeRunner implements Runner { + + private static ObjectMapper jsonMapper = JacksonMapper.CUSTOM_OBJECT_MAPPER; + private static final int HTTP_PORT = 8080; + + private final String image; + private final String command; + private final PythonFunctionSpec functionSpec; + private final Map> groupedSecrets; + + public PythonServeRunner( + String image, + String command, + PythonFunctionSpec functionPythonSpec, + Map> groupedSecrets + ) { + this.image = image; + this.command = command; + this.functionSpec = functionPythonSpec; + this.groupedSecrets = groupedSecrets; + } + + @Override + public K8sRunnable produce(Run run) { + PythonRunSpec runSpec = new PythonRunSpec(run.getSpec()); + PythonServeTaskSpec taskSpec = runSpec.getTaskServeSpec(); + + List coreEnvList = new ArrayList<>( + List.of(new CoreEnv("PROJECT_NAME", run.getProject()), new CoreEnv("RUN_ID", run.getId())) + ); + + Optional.ofNullable(taskSpec.getEnvs()).ifPresent(coreEnvList::addAll); + + //build nuclio definition + HashMap event = new HashMap<>(); + + // event.put("body", jsonMapper.writeValueAsString(run)); + + //define http trigger + //TODO use proper model + HashMap triggers = new HashMap<>(); + HashMap http = new HashMap<>(Map.of("kind", "http", "maxWorkers", 2)); + triggers.put("http", http); + + NuclioFunctionSpec nuclio = NuclioFunctionSpec + .builder() + .runtime("python") + //invoke user code wrapped via default handler + .handler("run_handler:handler_serve") + .triggers(triggers) + //directly invoke user code + // .handler("main:" + runSpec.getFunctionSpec().getSource().getHandler()) + .event(event) + .build(); + + String nuclioFunction = NuclioFunctionBuilder.write(nuclio); + + //read source and build context + List contextRefs = null; + List contextSources = new ArrayList<>(); + ContextSource fn = ContextSource + .builder() + .name("function.yaml") + .base64(Base64.getUrlEncoder().encodeToString(nuclioFunction.getBytes(StandardCharsets.UTF_8))) + .build(); + contextSources.add(fn); + + if (functionSpec.getSource() != null) { + SourceCode source = functionSpec.getSource(); + String path = "main.py"; + + if (StringUtils.hasText(source.getSource())) { + try { + //evaluate if local path (no scheme) + UriComponents uri = UriComponentsBuilder.fromUriString(source.getSource()).build(); + String scheme = uri.getScheme(); + + if (scheme != null) { + //write as ref + contextRefs = Collections.singletonList(ContextRef.from(source.getSource())); + } else { + if (StringUtils.hasText(path)) { + //override path for local src + path = uri.getPath(); + if (path.startsWith(".")) { + path = path.substring(1); + } + } + } + } catch (IllegalArgumentException e) { + //skip invalid source + } + } + + if (StringUtils.hasText(source.getBase64())) { + contextSources.add(ContextSource.builder().name(path).base64(source.getBase64()).build()); + } + } + + //merge env with PYTHON path override + coreEnvList.add(new CoreEnv("PYTHONPATH", "${PYTHONPATH}:/shared/")); + + //expose http trigger only + CorePort servicePort = new CorePort(HTTP_PORT, HTTP_PORT); + + K8sRunnable k8sServeRunnable = K8sServeRunnable + .builder() + .runtime(PythonRuntime.RUNTIME) + .task(PythonServeTaskSpec.KIND) + .state(State.READY.name()) + //base + .image(StringUtils.hasText(functionSpec.getImage()) ? functionSpec.getImage() : image) + .command(command) + // .args(new String[] { run.getProject(), run.getId() }) + .args(new String[] { "--config", "/shared/function.yaml" }) + .contextRefs(contextRefs) + .contextSources(contextSources) + .envs(coreEnvList) + .secrets(groupedSecrets) + .resources(taskSpec.getResources()) + .volumes(taskSpec.getVolumes()) + .nodeSelector(taskSpec.getNodeSelector()) + .affinity(taskSpec.getAffinity()) + .tolerations(taskSpec.getTolerations()) + //specific + .replicas(taskSpec.getReplicas()) + .servicePorts(List.of(servicePort)) + .serviceType(taskSpec.getServiceType()) + .build(); + + k8sServeRunnable.setId(run.getId()); + k8sServeRunnable.setProject(run.getProject()); + + return k8sServeRunnable; + } +} diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/run/PythonRunSpec.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/run/PythonRunSpec.java index 8fd148841..0d88d1322 100644 --- a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/run/PythonRunSpec.java +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/run/PythonRunSpec.java @@ -7,6 +7,7 @@ import it.smartcommunitylabdhub.runtime.python.PythonRuntime; import it.smartcommunitylabdhub.runtime.python.specs.function.PythonFunctionSpec; import it.smartcommunitylabdhub.runtime.python.specs.task.PythonJobTaskSpec; +import it.smartcommunitylabdhub.runtime.python.specs.task.PythonServeTaskSpec; import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -25,6 +26,9 @@ public class PythonRunSpec extends RunBaseSpec { @JsonUnwrapped private PythonJobTaskSpec taskJobSpec; + @JsonUnwrapped + private PythonServeTaskSpec taskServeSpec; + @JsonUnwrapped private PythonFunctionSpec functionSpec; @@ -45,6 +49,7 @@ public void configure(Map data) { PythonRunSpec spec = mapper.convertValue(data, PythonRunSpec.class); this.taskJobSpec = spec.getTaskJobSpec(); + this.taskServeSpec = spec.getTaskServeSpec(); this.functionSpec = spec.getFunctionSpec(); this.inputs = spec.getInputs(); @@ -56,6 +61,10 @@ public void setTaskJobSpec(PythonJobTaskSpec taskJobSpec) { this.taskJobSpec = taskJobSpec; } + public void setTaskServeSpec(PythonServeTaskSpec taskServeSpec) { + this.taskServeSpec = taskServeSpec; + } + public void setFunctionSpec(PythonFunctionSpec functionSpec) { this.functionSpec = functionSpec; } diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpec.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpec.java new file mode 100644 index 000000000..7947259b2 --- /dev/null +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpec.java @@ -0,0 +1,47 @@ +package it.smartcommunitylabdhub.runtime.python.specs.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import it.smartcommunitylabdhub.commons.annotations.common.SpecType; +import it.smartcommunitylabdhub.commons.models.enums.EntityName; +import it.smartcommunitylabdhub.framework.k8s.base.K8sTaskBaseSpec; +import it.smartcommunitylabdhub.framework.k8s.objects.CoreServiceType; +import it.smartcommunitylabdhub.runtime.python.PythonRuntime; +import jakarta.validation.constraints.Min; +import java.io.Serializable; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@SpecType(runtime = PythonRuntime.RUNTIME, kind = PythonServeTaskSpec.KIND, entity = EntityName.TASK) +public class PythonServeTaskSpec extends K8sTaskBaseSpec { + + public static final String KIND = "python+serve"; + + @JsonProperty("replicas") + @Min(0) + private Integer replicas; + + // ClusterIP or NodePort + @JsonProperty(value = "service_type", defaultValue = "NodePort") + private CoreServiceType serviceType; + + public PythonServeTaskSpec(Map data) { + configure(data); + } + + @Override + public void configure(Map data) { + super.configure(data); + + PythonServeTaskSpec spec = mapper.convertValue(data, PythonServeTaskSpec.class); + + this.replicas = spec.getReplicas(); + this.setServiceType(spec.getServiceType()); + } +} diff --git a/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpecFactory.java b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpecFactory.java new file mode 100644 index 000000000..0b67a156d --- /dev/null +++ b/modules/runtime-python/src/main/java/it/smartcommunitylabdhub/runtime/python/specs/task/PythonServeTaskSpecFactory.java @@ -0,0 +1,20 @@ +package it.smartcommunitylabdhub.runtime.python.specs.task; + +import it.smartcommunitylabdhub.commons.infrastructure.SpecFactory; +import java.io.Serializable; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class PythonServeTaskSpecFactory implements SpecFactory { + + @Override + public PythonServeTaskSpec create() { + return new PythonServeTaskSpec(); + } + + @Override + public PythonServeTaskSpec create(Map data) { + return new PythonServeTaskSpec(data); + } +}