Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iteration on Nomad/Variables <-> Nextflow/Secrets #75

Merged
merged 14 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ abstract class GenerateIdxTask extends DefaultTask{

def matcher = new SourcesMatcher(project)
def extensionsClassName = matcher.pluginExtensions
extensionsClassName += matcher.providers
def traceClassName = matcher.traceObservers
def all = extensionsClassName+traceClassName
output.text = all.join('\n')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ class SourcesMatcher {
findSources(/class (\w+) extends Executor implements ExtensionPoint/)
}

List<String> getProviders(){
return findSources(/implements SecretsProvider/)
}


List<String> getTraceObservers(){
return findSources(/class (\w+) implements TraceObserverFactory/)
}
Expand Down
27 changes: 26 additions & 1 deletion plugins/nf-nomad/src/main/nextflow/nomad/NomadPlugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package nextflow.nomad

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.cli.PluginAbstractExec
import nextflow.nomad.secrets.NomadSecretCmd
import nextflow.nomad.executor.TaskDirectives
import nextflow.plugin.BasePlugin
import nextflow.script.ProcessConfig
import nextflow.secret.SecretsLoader
import org.pf4j.PluginWrapper

/**
Expand All @@ -30,14 +34,35 @@ import org.pf4j.PluginWrapper
* @author : matthdsm <[email protected]>
*/
@CompileStatic
class NomadPlugin extends BasePlugin {
@Slf4j
class NomadPlugin extends BasePlugin implements PluginAbstractExec{

NomadPlugin(PluginWrapper wrapper) {
super(wrapper)
addCustomDirectives()
SecretsLoader.instance.reset()
}

private static void addCustomDirectives() {
ProcessConfig.DIRECTIVES.addAll(TaskDirectives.ALL)
}

@Override
List<String> getCommands() {
return ['secrets']
}

@Override
int exec(String cmd, List<String> args) {
return switch (cmd){
case 'secrets'-> secrets(args.first(), args.drop(1))
default -> -1
}
}

int secrets(String action, List<String>args){
NomadSecretCmd nomadSecretCmd = new NomadSecretCmd()
nomadSecretCmd.runCommand( session.config , action, args)
return 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class NomadJobOpts{

JobConstraints constraintsSpec

NomadSecretOpts secretOpts

NomadJobOpts(Map nomadJobOpts, Map<String,String> env=null){
assert nomadJobOpts!=null

Expand Down Expand Up @@ -76,6 +78,7 @@ class NomadJobOpts{
this.affinitySpec = parseAffinity(nomadJobOpts)
this.constraintSpec = parseConstraint(nomadJobOpts)
this.constraintsSpec = parseConstraints(nomadJobOpts)
this.secretOpts = parseSecrets(nomadJobOpts)
}

JobVolume[] parseVolumes(Map nomadJobOpts){
Expand Down Expand Up @@ -158,4 +161,14 @@ class NomadJobOpts{
null
}
}

NomadSecretOpts parseSecrets(Map nomadJobOpts){
if (nomadJobOpts.secrets && nomadJobOpts.secrets instanceof Map) {
def secretOpts = new NomadSecretOpts(nomadJobOpts.secrets as Map)
secretOpts
}else{
null
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package nextflow.nomad.config

class NomadSecretOpts {

final Boolean enable
final String path

NomadSecretOpts(Map map){
this.enable = map.containsKey('enable') ? map.get('enable') as boolean : false
this.path = map.path ?: "secrets/nf-nomad"
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jagedn , I've renamed the enable to enabled in order to comply with the general pattern of enabling scopes as per Nextflow configuration


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package nextflow.nomad.executor
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.ApiException
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.api.VariablesApi
import io.nomadproject.client.model.*
import nextflow.nomad.models.ConstraintsBuilder
import nextflow.nomad.models.JobConstraints
Expand All @@ -43,8 +45,9 @@ import java.nio.file.Path
class NomadService implements Closeable{

NomadConfig config

ApiClient apiClient
JobsApi jobsApi
VariablesApi variablesApi

NomadService(NomadConfig config) {
this.config = config
Expand All @@ -54,7 +57,7 @@ class NomadService implements Closeable{
final READ_TIMEOUT_MILLISECONDS = 60000
final WRITE_TIMEOUT_MILLISECONDS = 60000

ApiClient apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
apiClient.basePath = config.clientOpts().address
log.debug "[NOMAD] Client Address: ${config.clientOpts().address}"

Expand All @@ -63,6 +66,7 @@ class NomadService implements Closeable{
apiClient.apiKey = config.clientOpts().token
}
this.jobsApi = new JobsApi(apiClient)
this.variablesApi = new VariablesApi(apiClient)
}

protected Resources getResources(TaskRun task) {
Expand Down Expand Up @@ -110,6 +114,9 @@ class NomadService implements Closeable{
try {
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null)
jobRegisterResponse.evalID
} catch( ApiException apiException){
log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
} catch (Throwable e) {
log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${e.message ?: e}", e)
throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${e.message ?: e}", e)
Expand Down Expand Up @@ -186,7 +193,7 @@ class NomadService implements Closeable{
affinity(task, taskDef)
constraint(task, taskDef)
constraints(task, taskDef)

secrets(task, taskDef)
return taskDef
}

Expand Down Expand Up @@ -276,7 +283,21 @@ class NomadService implements Closeable{
taskDef
}


protected Task secrets(TaskRun task, Task taskDef){
if( config.jobOpts()?.secretOpts?.enable) {
def secrets = task.processor?.config?.get(TaskDirectives.SECRETS)
if (secrets) {
Template template = new Template(envvars: true, destPath: "/secrets/nf-nomad")
String secretPath = config.jobOpts()?.secretOpts?.path
String tmpl = secrets.collect { String name ->
"${name}={{ with nomadVar \"$secretPath/${name}\" }}{{ .${name} }}{{ end }}"
}.join('\n').stripIndent()
template.embeddedTmpl(tmpl)
taskDef.addTemplatesItem(template)
}
}
taskDef
}

protected Job assignDatacenters(TaskRun task, Job job){
def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS)
Expand Down Expand Up @@ -365,4 +386,49 @@ class NomadService implements Closeable{
throw new ProcessSubmitException("[NOMAD] Failed to get alloactions ${jobId} -- Cause: ${e.message ?: e}", e)
}
}

String getVariableValue(String key){
getVariableValue(config.jobOpts().secretOpts?.path, key)
}

String getVariableValue(String path, String key){
var variable = variablesApi.getVariableQuery("$path/$key",
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
variable?.items?.find{ it.key == key }?.value
}

void setVariableValue(String key, String value){
setVariableValue(config.jobOpts().secretOpts?.path, key, value)
}

void setVariableValue(String path, String key, String value){
var content = Map.of(key,value)
var variable = new Variable(path: path, items: content)
variablesApi.postVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
}

List<String> getVariablesList(){
var listRequest = variablesApi.getVariablesListRequest(
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null, null, null, null, null)
listRequest.collect{ it.path}
}

void deleteVariable(String key){
deleteVariable(config.jobOpts().secretOpts?.path, key)
}

void deleteVariable(String path, String key){
var variable = new Variable( items: Map.of(key, ""))
variablesApi.deleteVariable("$path/$key", variable,
config.jobOpts().region,
config.jobOpts().namespace,
null, null, null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ class TaskDirectives {

public static final String CONSTRAINTS = "constraints"

public static final String SECRETS = "secret"

public static final List<String> ALL = [
DATACENTERS,
CONSTRAINTS
CONSTRAINTS,
SECRETS
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package nextflow.nomad.secrets

import groovy.util.logging.Slf4j
import nextflow.exception.AbortOperationException
import nextflow.nomad.config.NomadConfig
import nextflow.nomad.executor.NomadService
import nextflow.plugin.Priority
import nextflow.secret.Secret
import nextflow.secret.SecretImpl
import nextflow.secret.SecretsProvider

@Slf4j
class NomadSecretCmd {

protected NomadService service
protected NomadConfig nomadConfig

int runCommand(Map config, String action, List<String> args){
nomadConfig = new NomadConfig((config.nomad ?: Collections.emptyMap()) as Map)
service = new NomadService(nomadConfig)
return switch (action){
case 'get' ->execGetSecretNames(args.removeAt(0).toString())
case 'set' ->execSetSecretNames(args.removeAt(0).toString(),args.removeAt(0).toString())
case 'list'->execListSecretsNames()
case 'delete'->execDeleteSecretNames(args.removeAt(0).toString())
default -> -1
}
}

int execListSecretsNames(){
def list = listSecretsNames()
println list.join('\n')
return 0
}

int execGetSecretNames(String name){
if(!name){
throw new AbortOperationException("Wrong number of arguments")
}
def secret = getSecret(name)
println secret
return 0
}

int execSetSecretNames(String name, String value){
if(!name){
throw new AbortOperationException("Wrong number of arguments")
}
setSecret(name, value)
return 0
}

int execDeleteSecretNames(String name){
if(!name){
throw new AbortOperationException("Wrong number of arguments")
}
deleteSecret(name)
return 0
}

String getSecret(String name) {
String value = service.getVariableValue(name)
if( !value )
throw new AbortOperationException("Missing secret name")
value
}

Set<String> listSecretsNames() {
service.variablesList
}

void setSecret(String name, String value) {
service.setVariableValue(name, value)
}

void deleteSecret(String name){
service.deleteVariable(name)
}
}
Loading
Loading