Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline launch label support #322

Merged
merged 6 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/reflect-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2809,6 +2809,11 @@
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"io.seqera.tower.model.PipelineOptimizationStatus",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true
},
{
"name":"io.seqera.tower.model.PipelineSecret",
"allDeclaredFields":true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ protected String apiUrl() {
return app().url;
}

protected String token() {
return app().token;
}

protected OrgAndWorkspaceDbDto findOrgAndWorkspaceByName(String organizationName, String workspaceName) throws ApiException {
ListWorkspacesAndOrgResponse workspacesAndOrgResponse = api().listWorkspacesUser(userId());

Expand Down
113 changes: 113 additions & 0 deletions src/main/java/io/seqera/tower/cli/commands/LaunchCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

package io.seqera.tower.cli.commands;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.seqera.tower.ApiException;
import io.seqera.tower.cli.commands.enums.OutputType;
import io.seqera.tower.cli.commands.global.WorkspaceOptionalOptions;
import io.seqera.tower.cli.exceptions.InvalidResponseException;
import io.seqera.tower.cli.responses.Response;
import io.seqera.tower.cli.responses.runs.RunSubmited;
import io.seqera.tower.model.ComputeEnvResponseDto;
import io.seqera.tower.model.CreateLabelRequest;
import io.seqera.tower.model.CreateLabelResponse;
import io.seqera.tower.model.LabelDbDto;
import io.seqera.tower.model.LabelType;
import io.seqera.tower.model.Launch;
import io.seqera.tower.model.ListLabelsResponse;
import io.seqera.tower.model.ListPipelinesResponse;
import io.seqera.tower.model.PipelineDbDto;
import io.seqera.tower.model.SubmitWorkflowLaunchRequest;
Expand All @@ -31,10 +38,23 @@
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import javax.annotation.Nullable;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static io.seqera.tower.cli.utils.FilesHelper.readString;
import static io.seqera.tower.cli.utils.ModelHelper.coalesce;
Expand Down Expand Up @@ -75,6 +95,9 @@ public class LaunchCmd extends AbstractRootCmd {
@Option(names = {"--wait"}, description = "Wait until given status or fail. Valid options: ${COMPLETION-CANDIDATES}.")
public WorkflowStatus wait;

@Option(names = {"-l", "--labels"}, split = ",", description = "Comma-separated list of labels for the pipeline.")
List<String> labels;

@ArgGroup(heading = "%nAdvanced options:%n", validate = false)
AdvancedOptions adv;

Expand All @@ -98,9 +121,12 @@ protected Response exec() throws ApiException, IOException {
protected Response runNextflowPipeline(Long wspId) throws ApiException, IOException {
// Retrieve the provided computeEnv or use the primary if not provided
ComputeEnvResponseDto ce = computeEnv != null ? computeEnvByRef(wspId, computeEnv) : primaryComputeEnv(wspId);
// Retrieve the IDs for the labels specified by the user if any
List<Long> labels = obtainLabelIDs(wspId);

return submitWorkflow(updateLaunchRequest(new WorkflowLaunchRequest()
.pipeline(pipeline)
.labelIds(labels.isEmpty() ? null : labels)
.computeEnvId(ce.getId())
.workDir(ce.getConfig().getWorkDir())
.preRunScript(ce.getConfig().getPreRunScript())
Expand All @@ -112,6 +138,7 @@ private WorkflowLaunchRequest updateLaunchRequest(WorkflowLaunchRequest base) th
return new WorkflowLaunchRequest()
.id(base.getId())
.pipeline(base.getPipeline())
.labelIds(base.getLabelIds())
.computeEnvId(base.getComputeEnvId())
.runName(coalesce(name, base.getRunName()))
.workDir(coalesce(workDir, base.getWorkDir()))
Expand All @@ -131,6 +158,7 @@ private WorkflowLaunchRequest updateLaunchRequest(WorkflowLaunchRequest base) th
}

protected Response runTowerPipeline(Long wspId) throws ApiException, IOException {

ListPipelinesResponse pipelines = api().listPipelines(Collections.emptyList(), wspId, 50, 0, pipeline, "all");
if (pipelines.getTotalSize() == 0) {
throw new InvalidResponseException(String.format("Pipeline '%s' not found on this workspace.", pipeline));
Expand Down Expand Up @@ -168,6 +196,9 @@ protected Response runTowerPipeline(Long wspId) throws ApiException, IOException
launchRequest.workDir(ce.getConfig().getWorkDir());
}

List<Long> labels = obtainLabelIDs(wspId);
launchRequest.labelIds(labels.isEmpty() ? null : labels);

return submitWorkflow(updateLaunchRequest(launchRequest), wspId, sourceWorkspaceId);
}

Expand Down Expand Up @@ -209,6 +240,88 @@ private WorkflowStatus checkWorkflowStatus(String workflowId, Long workspaceId)
}
}

private List<Long> obtainLabelIDs(@Nullable Long workspaceId) throws ApiException {

if (labels == null || labels.isEmpty()) {
return Collections.emptyList();
}

// retrieve labels for the workspace and check if we need to create new ones
List<LabelDbDto> wspLabels = new ArrayList<>();

ListLabelsResponse res = api().listLabels(workspaceId, null, null, null, LabelType.SIMPLE, null);
if (res.getLabels() != null) {
wspLabels.addAll(res.getLabels());
}

Map<String, Long> nameToID = wspLabels
.stream()
.collect(Collectors.toMap(LabelDbDto::getName, LabelDbDto::getId));

// get label names not registered in workspace (names are unique per wspID)
List<String> newLabels = labels
.stream()
.filter(labelName -> !nameToID.containsKey(labelName))
.collect(Collectors.toList());

if (!newLabels.isEmpty() && !labelPermission(workspaceId)) {
throw new ApiException("User does not have permission to modify pipeline labels");
}

// create the new ones via POST /labels
tcrespog marked this conversation as resolved.
Show resolved Hide resolved
for (String labelName: newLabels) {
CreateLabelResponse created = api().createLabel(
new CreateLabelRequest()
.name(labelName)
.resource(false)
.isDefault(false),
workspaceId
);
nameToID.put(created.getName(), created.getId());
}

// map requested label names to label IDs
return labels
.stream()
.map(nameToID::get)
.collect(Collectors.toList());
}

private boolean labelPermission(@Nullable Long wspId) throws ApiException {

// personal workspace
if (wspId == null) return true;

var client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();

var uri = UriBuilder
.fromUri(URI.create(apiUrl() + "/permissions"))
JaimeSeqLabs marked this conversation as resolved.
Show resolved Hide resolved
.queryParam("workspaceId", wspId.toString())
.build();

var req = HttpRequest.newBuilder()
.GET()
.uri(uri)
.header("Authorization", String.format("Bearer %s", token()))
.build();

try {
HttpResponse<String> response = client.send(req, HttpResponse.BodyHandlers.ofString()); // sync

JsonNode json = new ObjectMapper().readTree(response.body());

var roleSet = new HashSet<String>();

json.get("workspace").get("roles").forEach(role -> roleSet.add(role.textValue()));

return roleSet.contains("owner") || roleSet.contains("admin") || roleSet.contains("maintain");

} catch (Throwable exception) {
throw new ApiException("Unable to reach API");
}
}

private AdvancedOptions adv() {
if (adv == null) {
Expand Down
88 changes: 88 additions & 0 deletions src/test/java/io/seqera/tower/cli/LaunchCmdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import static org.mockserver.matchers.Times.exactly;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
import static org.mockserver.model.JsonBody.json;


class LaunchCmdTest extends BaseCmdTest {

Expand Down Expand Up @@ -217,6 +219,92 @@ void testSubmitLaunchpadPipelineWithCustomName(OutputType format, MockServerClie
assertOutput(format, out, new RunSubmited("35aLiS0bIM5efd", null, baseUserUrl(mock, "jordi"), USER_WORKSPACE_NAME));
}

@ParameterizedTest
@EnumSource(OutputType.class)
void testSubmitLaunchpadPipelineWithLabels(OutputType format, MockServerClient mock) {

// labels endpoint mock
mock.when(
request()
.withMethod("GET")
.withPath("/labels")
.withQueryStringParameter("type", "simple"),
exactly(1)
).respond(
response()
.withStatusCode(200)
.withBody(loadResource("labels_user"))
.withContentType(MediaType.APPLICATION_JSON)
);
mock.when(
request()
.withMethod("POST")
.withPath("/labels")
.withBody(json(" {\n" +
" \"name\": \"LabelThree\",\n" +
" \"resource\": false,\n" +
" \"isDefault\": false\n" +
" }\n")),
exactly(1)
).respond(
response()
.withStatusCode(200)
.withBody(json("{\n" +
" \"id\": 3,\n" +
" \"name\": \"LabelThree\",\n" +
" \"resource\": false,\n" +
" \"isDefault\": false\n" +
"}\n"))
.withContentType(MediaType.APPLICATION_JSON)
);

// pipelines endpoint mock
mock.when(
request().withMethod("GET").withPath("/pipelines"), exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("pipelines_sarek")).withContentType(MediaType.APPLICATION_JSON)
);

mock.when(
request().withMethod("GET").withPath("/pipelines/250911634275687/launch"), exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("pipeline_launch_describe")).withContentType(MediaType.APPLICATION_JSON)
);

// launch endpoint mock
mock.when(
request()
.withMethod("POST")
.withPath("/workflow/launch")
.withBody(json(" {\n" +
" \"launch\":{\n" +
" \"id\":\"5nmCvXcarkvv8tELMF4KyY\",\n" +
" \"computeEnvId\":\"4X7YrYJp9B1d1DUpfur7DS\",\n" +
" \"pipeline\":\"https://github.com/nf-core/sarek\",\n" +
" \"workDir\":\"/efs\",\n" +
" \"pullLatest\":false,\n" +
" \"stubRun\":false,\n" +
" \"labelIds\": [2, 3]\n" +
" }\n" +
" }\n")),
exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("workflow_launch")).withContentType(MediaType.APPLICATION_JSON)
);

mock.when(
request().withMethod("GET").withPath("/user-info"), exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("user")).withContentType(MediaType.APPLICATION_JSON)
);

// Run the command
ExecOut out = exec(format, mock, "launch", "sarek", "-l", "LabelTwo,LabelThree");

// Assert results
assertOutput(format, out, new RunSubmited("35aLiS0bIM5efd", null, baseUserUrl(mock, "jordi"), USER_WORKSPACE_NAME));
}

@Test
void testSubmitToAWorkspace(MockServerClient mock) {

Expand Down
20 changes: 20 additions & 0 deletions src/test/resources/runcmd/labels_user.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"labels": [

{
"id": 1,
"name": "LabelOne",
"value": null,
"resource": false,
"isDefault": false
},
{
"id": 2,
"name": "LabelTwo",
"value": null,
"resource": false,
"isDefault": false
}
],
"totalsize": 2
}
Loading