From 53737fbaca3f41022fdda38e5f26b639f5705e1b Mon Sep 17 00:00:00 2001 From: Gibson Chikafa Date: Thu, 5 Oct 2023 13:18:35 +0200 Subject: [PATCH] [HWORKS-697] Add airflow user to project (#1520) (#1408) --- .../src/test/ruby/spec/airflow_spec.rb | 6 + .../hopsworks/api/airflow/AirflowService.java | 31 +- .../hops/hopsworks/api/jobs/JobsResource.java | 4 +- .../jobs/executions/ExecutionsResource.java | 17 +- .../hopsworks/api/project/ProjectService.java | 4 +- .../common/airflow/AirflowJWTManager.java | 555 ------------------ .../common/dao/airflow/AirflowDag.java | 36 -- .../common/dao/airflow/AirflowDagFacade.java | 99 ---- .../MaterializedJWTFacade.java | 24 +- .../common/jupyter/JupyterJWTManager.java | 6 +- .../common/project/ProjectController.java | 13 + .../common/util/AccessController.java | 13 +- .../{airflow => jupyter}/MaterializedJWT.java | 2 +- .../MaterializedJWTID.java | 3 +- .../main/resources/META-INF/persistence.xml | 2 +- .../yo/app/scripts/controllers/projectCtrl.js | 14 +- .../yo/app/scripts/services/AirflowService.js | 3 - 17 files changed, 73 insertions(+), 759 deletions(-) delete mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java delete mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java delete mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java rename hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/{airflow => jupyter}/MaterializedJWTFacade.java (75%) rename hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/{airflow => jupyter}/MaterializedJWT.java (97%) rename hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/{airflow => jupyter}/MaterializedJWTID.java (97%) diff --git a/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb b/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb index 9b2db6918d..6c4ded01b0 100644 --- a/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb @@ -45,6 +45,12 @@ with_valid_project end + it 'should add airflow user to project' do + get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/projectMembers" + airflow_member = json_body.detect { |e| e[:user][:email] == "airflow@hopsworks.ai" } + expect(airflow_member).not_to be_nil + end + it "should be able to compose DAG" do get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/airflow/secretDir" expect_status_details(200) diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java index e13e1f77e3..1eca909852 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java @@ -21,7 +21,6 @@ import io.hops.hopsworks.api.jwt.JWTHelper; import io.hops.hopsworks.common.airflow.AirflowController; import io.hops.hopsworks.common.airflow.AirflowDagDTO; -import io.hops.hopsworks.common.airflow.AirflowJWTManager; import io.hops.hopsworks.common.dao.project.ProjectFacade; import io.hops.hopsworks.exceptions.AirflowException; import io.hops.hopsworks.jwt.annotation.JWTRequired; @@ -34,6 +33,7 @@ import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.enterprise.context.RequestScoped; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -51,49 +51,38 @@ public class AirflowService { @EJB private JWTHelper jwtHelper; @EJB - private AirflowJWTManager airflowJWTManager; - @EJB private AirflowController airflowController; private Integer projectId; // No @EJB annotation for Project, it's injected explicitly in ProjectService. private Project project; + // Audience for Airflow JWTs private static final String[] JWT_AUDIENCE = new String[]{Audience.API}; - + public AirflowService() { } - + public void setProjectId(Integer projectId) { this.projectId = projectId; this.project = this.projectFacade.find(projectId); } - + public Integer getProjectId() { return projectId; } - @POST - @Path("/jwt") - @Produces(MediaType.APPLICATION_JSON) - @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST}) - @JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"}) - @ApiOperation(value = "Generate a JWT for Airflow usage and store it in project's secret directory in Airflow") - public Response storeAirflowJWT(@Context SecurityContext sc) throws AirflowException { - Users user = jwtHelper.getUserPrincipal(sc); - airflowJWTManager.prepareSecurityMaterial(user, project, JWT_AUDIENCE); - return Response.noContent().build(); - } - @POST @Path("/dag") @Produces(MediaType.APPLICATION_JSON) @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST}) - @JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"}) + @JWTRequired(acceptedTokens = {Audience.API}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"}) @ApiOperation(value = "Generate an Airflow Python DAG file from a DAG definition") - public Response composeDAG(AirflowDagDTO dagDefinition, @Context SecurityContext sc) throws AirflowException { + public Response composeDAG(AirflowDagDTO dagDefinition, + @Context HttpServletRequest req, + @Context SecurityContext sc) throws AirflowException { Users user = jwtHelper.getUserPrincipal(sc); airflowController.composeDAG(project, user, dagDefinition); return Response.ok().build(); } -} +} \ No newline at end of file diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java index 50a5033e36..843406e65d 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java @@ -112,7 +112,7 @@ public JobsResource setProject(Integer projectId) { @Produces(MediaType.APPLICATION_JSON) @AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER}) @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"}) @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) public Response getAll( @BeanParam Pagination pagination, @@ -136,7 +136,7 @@ public Response getAll( @Produces(MediaType.APPLICATION_JSON) @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST}) @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"}) @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) public Response getJob(@PathParam("name") String name, @BeanParam JobsBeanParam jobsBeanParam, diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java index 3389a1220b..b66fec7a88 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java @@ -25,6 +25,7 @@ import io.hops.hopsworks.common.api.ResourceRequest; import io.hops.hopsworks.common.jobs.JobLogDTO; import io.hops.hopsworks.common.jobs.execution.ExecutionController; +import io.hops.hopsworks.common.util.Settings; import io.hops.hopsworks.exceptions.GenericException; import io.hops.hopsworks.exceptions.JobException; import io.hops.hopsworks.exceptions.ProjectException; @@ -67,6 +68,8 @@ public class ExecutionsResource { private ExecutionController executionController; @EJB private ExecutionsBuilder executionsBuilder; + @EJB + private Settings settings; @EJB @@ -85,7 +88,8 @@ public ExecutionsResource setJob(Jobs job) { @AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER}) @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) - @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", + "AGENT"}) public Response getExecutions( @BeanParam Pagination pagination, @BeanParam ExecutionsBeanParam executionsBeanParam, @@ -109,7 +113,8 @@ public Response getExecutions( @AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER}) @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) - @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", + "AGENT"}) public Response getExecution(@ApiParam(value = "execution id", required = true) @PathParam("id") Integer id, @BeanParam ExecutionsBeanParam executionsBeanParam, @Context UriInfo uriInfo, @@ -152,7 +157,8 @@ public Response stopExecution( @AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER}) @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) - @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", + "AGENT"}) public Response startExecution( @ApiParam(value = "Arguments for executing the job") String args, @Context SecurityContext sc, @@ -160,6 +166,11 @@ public Response startExecution( Users user = jWTHelper.getUserPrincipal(sc); + // run job as job owner if user is airflow + if (user.getUsername().equals(settings.getAirflowUser())) { + user = job.getCreator(); + } + Execution exec; if(!Strings.isNullOrEmpty(job.getJobConfig().getDefaultArgs()) && Strings.isNullOrEmpty(args)) { exec = executionController.start(job, job.getJobConfig().getDefaultArgs(), user); diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java index 2236d295a1..87993d6631 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java @@ -259,7 +259,7 @@ public Response getAllProjects(@Context HttpServletRequest req, @Context Securit @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) @ApiKeyRequired(acceptedScopes = {ApiScope.PROJECT}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"}) @AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER}) @Produces(MediaType.APPLICATION_JSON) public Response getProjectByName(@PathParam("projectName") String projectName, @@ -272,7 +272,7 @@ public Response getProjectByName(@PathParam("projectName") String projectName, @GET @Path("/asShared/getProjectInfo/{projectName}") @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"}) - @ApiKeyRequired( acceptedScopes = {ApiScope.PROJECT}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"}) + @ApiKeyRequired( acceptedScopes = {ApiScope.PROJECT}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT"}) @Produces(MediaType.APPLICATION_JSON) public Response getProjectByNameAsShared(@PathParam("projectName") String projectName, @Context HttpServletRequest req, @Context SecurityContext sc) throws ProjectException, GenericException { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java deleted file mode 100644 index 34e79832c5..0000000000 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java +++ /dev/null @@ -1,555 +0,0 @@ -/* - * This file is part of Hopsworks - * Copyright (C) 2019, Logical Clocks AB. All rights reserved - * - * Hopsworks is free software: you can redistribute it and/or modify it under the terms of - * the GNU Affero General Public License as published by the Free Software Foundation, - * either version 3 of the License, or (at your option) any later version. - * - * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR - * PURPOSE. See the GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License along with this program. - * If not, see . - */ - -package io.hops.hopsworks.common.airflow; - -import com.auth0.jwt.exceptions.JWTDecodeException; -import com.auth0.jwt.interfaces.DecodedJWT; -import io.hops.hopsworks.common.dao.airflow.AirflowDag; -import io.hops.hopsworks.common.dao.airflow.AirflowDagFacade; -import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT; -import io.hops.hopsworks.common.dao.airflow.MaterializedJWTFacade; -import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWTID; -import io.hops.hopsworks.persistence.entity.project.Project; -import io.hops.hopsworks.common.dao.project.ProjectFacade; -import io.hops.hopsworks.persistence.entity.user.BbcGroup; -import io.hops.hopsworks.common.dao.user.UserFacade; -import io.hops.hopsworks.persistence.entity.user.Users; -import io.hops.hopsworks.common.hdfs.HdfsUsersController; -import io.hops.hopsworks.common.security.CertificateMaterializer; -import io.hops.hopsworks.common.util.DateUtils; -import io.hops.hopsworks.common.util.Settings; -import io.hops.hopsworks.exceptions.AirflowException; -import io.hops.hopsworks.jwt.Constants; -import io.hops.hopsworks.restutils.RESTCodes; -import io.hops.hopsworks.jwt.JWTController; -import io.hops.hopsworks.jwt.SignatureAlgorithm; -import io.hops.hopsworks.jwt.exception.InvalidationException; -import io.hops.hopsworks.jwt.exception.JWTException; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.FileUtils; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import javax.ejb.AccessTimeout; -import javax.ejb.DependsOn; -import javax.ejb.EJB; -import javax.ejb.Lock; -import javax.ejb.LockType; -import javax.ejb.Singleton; -import javax.ejb.Startup; -import javax.ejb.Timeout; -import javax.ejb.Timer; -import javax.ejb.TimerConfig; -import javax.ejb.TimerService; -import javax.ejb.TransactionAttribute; -import javax.ejb.TransactionAttributeType; -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.LinkOption; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.GroupPrincipal; -import java.nio.file.attribute.PosixFileAttributeView; -import java.nio.file.attribute.PosixFilePermission; -import java.security.GeneralSecurityException; -import java.sql.SQLException; -import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -@Singleton -@Startup -@TransactionAttribute(TransactionAttributeType.NEVER) -@DependsOn("Settings") -public class AirflowJWTManager { - private final static Logger LOG = Logger.getLogger(AirflowJWTManager.class.getName()); - - private final static String TOKEN_FILE_SUFFIX = ".jwt"; - private final static Set TOKEN_FILE_PERMISSIONS = new HashSet<>(5); - static { - TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.OWNER_READ); - TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.OWNER_WRITE); - TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.OWNER_EXECUTE); - - TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.GROUP_READ); - TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.GROUP_EXECUTE); - } - - private final TreeSet airflowJWTs = new TreeSet<>(new Comparator() { - @Override - public int compare(AirflowJWT t0, AirflowJWT t1) { - if (t0.equals(t1)) { - return 0; - } else { - if (t0.expiration.isBefore(t1.expiration)) { - return -1; - } else if (t0.expiration.isAfter(t1.expiration)) { - return 1; - } - return 0; - } - } - }); - - @EJB - private HdfsUsersController hdfsUsersController; - @EJB - private Settings settings; - @EJB - private JWTController jwtController; - @EJB - private AirflowDagFacade airflowDagFacade; - @EJB - private CertificateMaterializer certificateMaterializer; - @EJB - private MaterializedJWTFacade materializedJWTFacade; - @EJB - private UserFacade userFacade; - @EJB - private ProjectFacade projectFacade; - @Resource - private TimerService timerService; - - private GroupPrincipal airflowGroup; - private volatile boolean initialized = false; - - @PostConstruct - @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) - public void init() { - Path airflowPath = Paths.get(settings.getAirflowDir()); - // This requires airflow::install to run before hopsworks is deployed. - if (airflowPath.toFile().isDirectory()) { - long interval = Math.max(1000L, settings.getJWTExpLeewaySec() * 1000L / 2); - timerService.createIntervalTimer(10L, interval, new TimerConfig("Airflow init/JWT renewal timer", false)); - } - } - - private void initAirflow() { - Path airflowPath = Paths.get(settings.getAirflowDir()); - try { - airflowGroup = Files.getFileAttributeView(airflowPath, PosixFileAttributeView.class, - LinkOption.NOFOLLOW_LINKS).readAttributes().group(); - try { - recover(); - } catch (Exception ex) { - LOG.log(Level.WARNING, "AirflowManager failed to recover, some already running workloads might be disrupted"); - } - // This is a dummy query to initialize airflowPool metrics for Prometheus - airflowDagFacade.getAllWithLimit(1); - initialized = true; - } catch (IOException | SQLException ex) { - LOG.log(Level.SEVERE, "Failed to initialize AirflowManager", ex); - } - LOG.log(Level.SEVERE, "AirflowManager initialized"); - } - - /** - * Recover security material for Airflow after restart. Read all active material from the database. - * Check if JWT exists in the local filesystem and it is valid. - * If not try to create a new one. Finally, materialize X.509 for project specific user. - */ - private void recover() { - LOG.log(Level.FINE, "Starting Airflow manager recovery"); - List failed2recover = new ArrayList<>(); - Project project = null; - Users user = null; - // Get last known state from storage - for (MaterializedJWT material : materializedJWTFacade.findAll4Airflow()) { - LOG.log(Level.FINEST, "Recovering material: " + material.getIdentifier().getProjectId() + " - " - + material.getIdentifier().getUserId()); - project = projectFacade.find(material.getIdentifier().getProjectId()); - user = userFacade.find(material.getIdentifier().getUserId()); - if (project == null || user == null) { - LOG.log(Level.WARNING, "Error while recovering Project with ID: " + material.getIdentifier().getProjectId() - + " and User ID: " + material.getIdentifier().getUserId() + ". Project or user is null"); - failed2recover.add(material); - continue; - } - - Path tokenFile = Paths.get(getProjectSecretsDirectory(user.getUsername()).toString(), - getTokenFileName(project.getName(), user.getUsername())); - AirflowJWT airflowJWT; - String token = null; - String materialIdentifier = "Project: " + project.getName() + " - User: " + user.getUsername(); - try { - // First try to read JWT from the filesystem. We expect most of the cases this will succeed. - token = FileUtils.readFileToString(tokenFile.toFile(), Charset.defaultCharset()); - DecodedJWT decoded = jwtController.verifyToken(token, settings.getJWTIssuer()); - airflowJWT = new AirflowJWT(user.getUsername(), project.getId(), project.getName(), - DateUtils.date2LocalDateTime(decoded.getExpiresAt()), user.getUid()); - airflowJWT.tokenFile = tokenFile; - airflowJWT.token = token; - LOG.log(Level.FINE, "Successfully read existing JWT from local filesystem for " + materialIdentifier); - } catch (IOException | JWTException | JWTDecodeException ex) { - // JWT does not exist in the filesystem or we cannot read them or it is not valid any longer - // We will create a new one - //TODO(Antonis): Not very good that audience is hardcoded, but it is not accessible from hopsworks-common - String[] audience = new String[]{"api"}; - LocalDateTime expirationDate = DateUtils.getNow().plus(settings.getJWTLifetimeMs(), ChronoUnit.MILLIS); - String[] roles = getUserRoles(user); - try { - LOG.log(Level.FINEST, "JWT for " + materialIdentifier + " does not exist in the local FS or it is not " - + "valid any longer, creating new one..."); - Map claims = new HashMap<>(3); - claims.put(Constants.RENEWABLE, false); - claims.put(Constants.EXPIRY_LEEWAY, settings.getJWTExpLeewaySec()); - claims.put(Constants.ROLES, roles); - token = jwtController.createToken(settings.getJWTSigningKeyName(), false, settings.getJWTIssuer(), - audience, DateUtils.localDateTime2Date(expirationDate), - DateUtils.localDateTime2Date(DateUtils.getNow()), user.getUsername(), - claims, SignatureAlgorithm.valueOf(settings.getJWTSignatureAlg())); - airflowJWT = new AirflowJWT(user.getUsername(), project.getId(), project.getName(), - expirationDate, user.getUid()); - airflowJWT.tokenFile = tokenFile; - airflowJWT.token = token; - writeTokenToFile(airflowJWT); - LOG.log(Level.FINE, "Created new JWT for " + materialIdentifier + " and flushed to local FS"); - } catch (IOException ex1) { - // Managed to create token but failed to write - LOG.log(Level.WARNING, "Could not write to local FS new JWT for recovered material " + materialIdentifier - + ". We will invalidate it and won't renew it.", ex1); - if (token != null) { - try { - LOG.log(Level.FINE, "Failed to write JWT for " + materialIdentifier + ". Invalidating it..."); - jwtController.invalidate(token); - } catch (InvalidationException ex2) { - // Not much we can do about it - } - } - failed2recover.add(material); - continue; - } catch (GeneralSecurityException | JWTException ex1) { - LOG.log(Level.WARNING, "Tried to recover JWT for " + materialIdentifier + " but we failed. Giving up... " + - "JWT will not be available for Airflow DAGs", ex1); - // Initial token is invalid and could not create new. Give up - failed2recover.add(material); - continue; - } - } - - // If everything went fine with JWT, proceed with X.509 - try { - LOG.log(Level.FINEST, "Materializing X.509 for " + materialIdentifier); - certificateMaterializer.materializeCertificatesLocalCustomDir(user.getUsername(), project.getName(), - getProjectSecretsDirectory(user.getUsername()).toString()); - LOG.log(Level.FINE, "Materialized X.509 for " + materialIdentifier); - airflowJWTs.add(airflowJWT); - } catch (IOException ex) { - LOG.log(Level.WARNING, "Could not materialize X.509 for " + materialIdentifier - + " Invalidating JWT and deleting from FS. JWT and X.509 will not be available for Airflow DAGs.", ex); - // Could not materialize X.509 - if (token != null) { - try { - LOG.log(Level.FINE, "Failed to materialize X.509 for " + materialIdentifier - + " Invalidating JWT and deleting it from local FS."); - jwtController.invalidate(token); - FileUtils.deleteDirectory(getProjectSecretsDirectory(user.getUsername()).toFile()); - } catch (InvalidationException | IOException ex1) { - // Not much we can do about it - } - } - failed2recover.add(material); - } - } - - // Remove failed material from persistent storage - for (MaterializedJWT failed : failed2recover) { - materializedJWTFacade.delete(failed.getIdentifier()); - } - } - - private String[] getUserRoles(Users p) { - Collection groupList = p.getBbcGroupCollection(); - String[] roles = new String[groupList.size()]; - int idx = 0; - for (BbcGroup g : groupList) { - roles[idx] = g.getGroupName(); - idx++; - } - return roles; - } - - private void isInitialized() throws AirflowException { - if (!initialized) { - throw new AirflowException(RESTCodes.AirflowErrorCode.AIRFLOW_MANAGER_UNINITIALIZED, Level.WARNING, - "AirflowManager is not initialized", - "AirflowManager failed to initialize or Airflow is not deployed"); - } - } - - @Lock(LockType.READ) - @AccessTimeout(value = 1, unit = TimeUnit.SECONDS) - public void prepareSecurityMaterial(Users user, Project project, String[] audience) throws AirflowException { - isInitialized(); - MaterializedJWTID materialID = new MaterializedJWTID(project.getId(), user.getUid(), - MaterializedJWTID.USAGE.AIRFLOW); - if (!materializedJWTFacade.exists(materialID)) { - LocalDateTime expirationDate = DateUtils.getNow().plus(settings.getJWTLifetimeMs(), ChronoUnit.MILLIS); - AirflowJWT airflowJWT = new AirflowJWT(user.getUsername(), project.getId(), project.getName(), expirationDate, - user.getUid()); - try { - String[] roles = getUserRoles(user); - MaterializedJWT airflowMaterial = new MaterializedJWT(new MaterializedJWTID(project.getId(), user.getUid(), - MaterializedJWTID.USAGE.AIRFLOW)); - materializedJWTFacade.persist(airflowMaterial); - Map claims = new HashMap<>(3); - claims.put(Constants.RENEWABLE, false); - claims.put(Constants.EXPIRY_LEEWAY, settings.getJWTExpLeewaySec()); - claims.put(Constants.ROLES, roles); - String token = jwtController.createToken(settings.getJWTSigningKeyName(), false, settings.getJWTIssuer(), - audience, DateUtils.localDateTime2Date(expirationDate), - DateUtils.localDateTime2Date(DateUtils.getNow()), user.getUsername(), - claims, SignatureAlgorithm.valueOf(settings.getJWTSignatureAlg())); - String projectAirflowDir = getProjectSecretsDirectory(user.getUsername()).toString(); - airflowJWT.tokenFile = Paths.get(projectAirflowDir, getTokenFileName(project.getName(), user.getUsername())); - - airflowJWT.token = token; - writeTokenToFile(airflowJWT); - certificateMaterializer.materializeCertificatesLocalCustomDir(user.getUsername(), project.getName(), - projectAirflowDir); - airflowJWTs.add(airflowJWT); - } catch (GeneralSecurityException | JWTException ex) { - deleteAirflowMaterial(materialID); - throw new AirflowException(RESTCodes.AirflowErrorCode.JWT_NOT_CREATED, Level.SEVERE, - "Could not generate Airflow JWT for user " + user.getUsername(), ex.getMessage(), ex); - } catch (IOException ex) { - LOG.log(Level.WARNING, "Could not write Airflow JWT for user " + hdfsUsersController - .getHdfsUserName(project, user), ex); - deleteAirflowMaterial(materialID); - try { - jwtController.invalidate(airflowJWT.token); - } catch (InvalidationException invEx) { - LOG.log(Level.FINE, "Could not invalidate Airflow JWT. Skipping...", ex); - } - throw new AirflowException(RESTCodes.AirflowErrorCode.JWT_NOT_STORED, Level.SEVERE, - "Could not store Airflow JWT for user " + hdfsUsersController.getHdfsUserName(project, user), - ex.getMessage(), ex); - } - } - } - - /** - * Timer bean to periodically (JWT expiration lee way / 2) (a) clean stale JWT and X.509 material for Airflow - * and (b) renew used JWTs. - * - * a. Iterate all the material in memory and clean those that don't have any entry in the database, project has been - * deleted or those that project exist but user does not own any non-paused DAG in Airflow. - * - * b. For a valid JWT, renew it if the time has come (after expiration time and before expiration + expLeeWay) - * - * @param timer - */ - @Lock(LockType.WRITE) - @AccessTimeout(value = 500) - @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) - @Timeout - public void monitorSecurityMaterial(Timer timer) { - if (!initialized) { - initAirflow(); - return; - } - try { - LocalDateTime now = DateUtils.getNow(); - // Clean unused token files and X.509 certificates - cleanStaleSecurityMaterial(); - - // Renew them - Set newTokens2Add = new HashSet<>(); - Iterator airflowJWTIt = airflowJWTs.iterator(); - while (airflowJWTIt.hasNext()) { - AirflowJWT airflowJWT = airflowJWTIt.next(); - // Set is sorted by expiration date - // If first does not need to be renewed, neither do the rest - if (airflowJWT.maybeRenew(now)) { - try { - LocalDateTime expirationDateTime = now.plus(settings.getJWTLifetimeMs(), ChronoUnit.MILLIS); - Date expirationDate = DateUtils.localDateTime2Date(expirationDateTime); - String token = jwtController.renewToken(airflowJWT.token, expirationDate, - DateUtils.localDateTime2Date(DateUtils.getNow()), true, new HashMap<>(3)); - - AirflowJWT renewedJWT = new AirflowJWT(airflowJWT.username, airflowJWT.projectId, airflowJWT.projectName, - expirationDateTime, airflowJWT.uid); - renewedJWT.tokenFile = airflowJWT.tokenFile; - renewedJWT.token = token; - - airflowJWTIt.remove(); - writeTokenToFile(renewedJWT); - newTokens2Add.add(renewedJWT); - } catch (JWTException ex) { - LOG.log(Level.WARNING, "Could not renew Airflow JWT for " + airflowJWT, ex); - } catch (IOException ex) { - LOG.log(Level.WARNING, "Could not write renewed Airflow JWT for " + airflowJWT, ex); - try { - jwtController.invalidate(airflowJWT.token); - } catch (InvalidationException iex) { - LOG.log(Level.FINE, "Could not invalidate Airflow JWT. SKipping..."); - } - } catch (Exception ex) { - LOG.log(Level.SEVERE, "Generic error while renewing Airflow JWTs", ex); - } - } else { - break; - } - } - airflowJWTs.addAll(newTokens2Add); - } catch (Exception e) { - LOG.log(Level.SEVERE, "Got an exception while renewing/invalidating airflow jwt token", e); - } - } - - private Path getProjectSecretsDirectory(String username) { - return Paths.get(settings.getAirflowDir(), "secrets", generateOwnerSecret(username)); - } - - private String generateOwnerSecret(String username) { - return DigestUtils.sha256Hex(username); - } - - private void writeTokenToFile(AirflowJWT airflowJWT) throws IOException { - Path parent = airflowJWT.tokenFile.getParent(); - if (!parent.toFile().exists()) { - parent.toFile().mkdirs(); - Files.setPosixFilePermissions(parent, TOKEN_FILE_PERMISSIONS); - Files.getFileAttributeView(parent, PosixFileAttributeView.class, - LinkOption.NOFOLLOW_LINKS).setGroup(airflowGroup); - } - FileUtils.writeStringToFile(airflowJWT.tokenFile.toFile(), airflowJWT.token); - Files.setPosixFilePermissions(airflowJWT.tokenFile, TOKEN_FILE_PERMISSIONS); - Files.getFileAttributeView(airflowJWT.tokenFile, PosixFileAttributeView.class, - LinkOption.NOFOLLOW_LINKS).setGroup(airflowGroup); - } - - private void deleteAirflowMaterial(MaterializedJWTID identifier) { - materializedJWTFacade.delete(identifier); - } - - private String getTokenFileName(String projectName, String username) { - return projectName + "__" + username + TOKEN_FILE_SUFFIX; - } - - private boolean deleteDirectoryIfEmpty(Path directory) throws IOException { - File directoryFile = directory.toFile(); - File[] content = directoryFile.listFiles(); - if (content != null && content.length == 0) { - FileUtils.deleteDirectory(directoryFile); - return true; - } - return false; - } - - private void cleanStaleSecurityMaterial() { - Iterator airflowJWTsIt = airflowJWTs.iterator(); - while (airflowJWTsIt.hasNext()) { - AirflowJWT nextElement = airflowJWTsIt.next(); - try { - MaterializedJWTID materialId = new MaterializedJWTID(nextElement.projectId, nextElement.uid, - MaterializedJWTID.USAGE.AIRFLOW); - MaterializedJWT airflowMaterial = materializedJWTFacade.findById(materialId); - boolean shouldDelete = true; - - if (airflowMaterial != null) { - List ownedDags = airflowDagFacade.filterByOwner(nextElement.username); - for (AirflowDag dag : ownedDags) { - if (!dag.getPaused()) { - shouldDelete = false; - break; - } - } - } - - if (shouldDelete) { - certificateMaterializer.removeCertificatesLocalCustomDir(nextElement.username, nextElement.projectName, - getProjectSecretsDirectory(nextElement.username).toString()); - - FileUtils.deleteQuietly(nextElement.tokenFile.toFile()); - airflowJWTsIt.remove(); - if (airflowMaterial != null) { - deleteAirflowMaterial(materialId); - } - deleteDirectoryIfEmpty(nextElement.tokenFile.getParent()); - } - } catch (Exception ex) { - // Catch everything here. We don't want the timer thread to get killed (expunging timer) - // Be on the safe side and renew the token - LOG.log(Level.WARNING, "Could not determine if token " + nextElement + " is stale. It will be renewed!", ex); - } - } - } - - private class AirflowJWT { - private final String username; - private final Integer projectId; - private final String projectName; - private final LocalDateTime expiration; - private final Integer uid; - - private String token; - private Path tokenFile; - - private AirflowJWT(String username, Integer projectId, String projectName, LocalDateTime expiration, Integer uid) { - this.username = username; - this.projectId = projectId; - this.projectName = projectName; - this.expiration = expiration; - this.uid = uid; - } - - private boolean maybeRenew(LocalDateTime now) { - return now.isAfter(expiration) || now.isEqual(expiration); - } - - @Override - public int hashCode() { - int result = 17; - result = 31 * result + uid; - result = 31 * result + projectId; - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o instanceof AirflowJWT) { - AirflowJWT other = (AirflowJWT) o; - return uid.equals(other.uid) && projectId.equals(other.projectId); - } - return false; - } - - @Override - public String toString() { - return "Airflow JWT - Project ID: " + projectId + " User ID: " + uid; - } - } -} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java deleted file mode 100644 index 660cec5266..0000000000 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * This file is part of Hopsworks - * Copyright (C) 2018, Logical Clocks AB. All rights reserved - * - * Hopsworks is free software: you can redistribute it and/or modify it under the terms of - * the GNU Affero General Public License as published by the Free Software Foundation, - * either version 3 of the License, or (at your option) any later version. - * - * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR - * PURPOSE. See the GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License along with this program. - * If not, see . - */ - -package io.hops.hopsworks.common.dao.airflow; - -public class AirflowDag { - - private final String id; - private final Boolean paused; - - public AirflowDag(String id, Boolean paused) { - this.id = id; - this.paused = paused; - } - - public String getId() { - return id; - } - public Boolean getPaused() { - return paused; - } - -} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java deleted file mode 100644 index 504b29fb8a..0000000000 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * This file is part of Hopsworks - * Copyright (C) 2018, Logical Clocks AB. All rights reserved - * - * Hopsworks is free software: you can redistribute it and/or modify it under the terms of - * the GNU Affero General Public License as published by the Free Software Foundation, - * either version 3 of the License, or (at your option) any later version. - * - * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR - * PURPOSE. See the GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License along with this program. - * If not, see . - */ - -package io.hops.hopsworks.common.dao.airflow; - -import javax.annotation.Resource; -import javax.ejb.Stateless; -import javax.ejb.TransactionAttribute; -import javax.ejb.TransactionAttributeType; -import javax.sql.DataSource; -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.logging.Logger; - -/** - * This is special case of a facade. It does not use the EntityManager. - * - * Installation of Airflow is optional. This means that we can't have the - * airflow persistence unit statically in persistence.xml file as the Connection - * Pool and JDBC Resource won't exist in Glassfish. - * - * All persistence units in persistence.xml *must* exist during application - * deployment. Otherwise the deployment fails. - * - * To tackle this dynamic installation, we skip adding airflow PU in persistence.xml - * and we inject the JDBC Resource programmatically. - * - * The method in this EJB will only be invoked when Airflow is installed, so it is - * safe to inject jdbc/airflow DataSource. - * - * DO NOT invoke the DataSource directly! - * - * Look on other facades for examples on how to properly access the database. - */ - -@Stateless -@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) -public class AirflowDagFacade { - - private static final Logger LOGGER = Logger.getLogger(AirflowDagFacade.class.getName()); - private static final String DAGS_STATUS_QUERY = "SELECT dag_id, is_paused FROM airflow.dag WHERE owners = ?"; - private static final String GET_ALL_DAGS_WITH_LIMIT_QUERY = "SELECT dag_id, is_paused FROM airflow.dag LIMIT ?"; - - @Resource(name = "jdbc/airflow") - private DataSource airflowDataSource; - - public List filterByOwner(String owner) throws IOException, SQLException { - if (owner == null || owner.isEmpty()) { - throw new IOException("Airflow DAG owner cannot be null or empty"); - } - List dags = new ArrayList<>(); - try (Connection connection = airflowDataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement(DAGS_STATUS_QUERY)) { - stmt.setString(1, owner); - try (ResultSet dagsRS = stmt.executeQuery()) { - while (dagsRS.next()) { - AirflowDag dag = new AirflowDag(dagsRS.getString("dag_id"), - dagsRS.getBoolean("is_paused")); - dags.add(dag); - } - return dags; - } - } - } - - public List getAllWithLimit(Integer limit) throws SQLException { - List dags = new ArrayList<>(); - try (Connection connection = airflowDataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement(GET_ALL_DAGS_WITH_LIMIT_QUERY)) { - stmt.setInt(1, limit); - try (ResultSet dagsRS = stmt.executeQuery()) { - while (dagsRS.next()) { - AirflowDag dag = new AirflowDag(dagsRS.getString("dag_id"), - dagsRS.getBoolean("is_paused")); - dags.add(dag); - } - return dags; - } - } - } -} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/MaterializedJWTFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/MaterializedJWTFacade.java similarity index 75% rename from hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/MaterializedJWTFacade.java rename to hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/MaterializedJWTFacade.java index a9577a226e..59978ee32b 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/MaterializedJWTFacade.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/MaterializedJWTFacade.java @@ -14,11 +14,11 @@ * If not, see . */ -package io.hops.hopsworks.common.dao.airflow; +package io.hops.hopsworks.common.dao.jupyter; -import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT; -import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWTID; +import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWT; +import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWTID; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; @@ -42,14 +42,14 @@ public MaterializedJWT findById(MaterializedJWTID identifier) { return entityManager.find(MaterializedJWT.class, identifier); } - public void persist(MaterializedJWT airflowMaterial) { - entityManager.persist(airflowMaterial); + public void persist(MaterializedJWT jupyterMaterial) { + entityManager.persist(jupyterMaterial); } public void delete(MaterializedJWTID identifier) { - MaterializedJWT airflowMaterial = findById(identifier); - if (airflowMaterial != null) { - entityManager.remove(airflowMaterial); + MaterializedJWT jupyterMaterial = findById(identifier); + if (jupyterMaterial != null) { + entityManager.remove(jupyterMaterial); } } @@ -58,13 +58,7 @@ public List findAll() { MaterializedJWT.class); return query.getResultList(); } - - public List findAll4Airflow() { - return entityManager.createNamedQuery("MaterializedJWT.findByUsage", MaterializedJWT.class) - .setParameter("usage", MaterializedJWTID.USAGE.AIRFLOW) - .getResultList(); - } - + public List findAll4Jupyter() { return entityManager.createNamedQuery("MaterializedJWT.findByUsage", MaterializedJWT.class) .setParameter("usage", MaterializedJWTID.USAGE.JUPYTER) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java index d609b7ba83..b4c99039a8 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java @@ -18,7 +18,7 @@ import com.auth0.jwt.exceptions.JWTDecodeException; import com.auth0.jwt.interfaces.DecodedJWT; -import io.hops.hopsworks.common.dao.airflow.MaterializedJWTFacade; +import io.hops.hopsworks.common.dao.jupyter.MaterializedJWTFacade; import io.hops.hopsworks.common.dao.jupyter.JupyterSettingsFacade; import io.hops.hopsworks.common.dao.jupyter.config.JupyterFacade; import io.hops.hopsworks.common.dao.project.ProjectFacade; @@ -33,8 +33,8 @@ import io.hops.hopsworks.jwt.SignatureAlgorithm; import io.hops.hopsworks.jwt.exception.InvalidationException; import io.hops.hopsworks.jwt.exception.JWTException; -import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT; -import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWTID; +import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWT; +import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWTID; import io.hops.hopsworks.persistence.entity.jupyter.JupyterProject; import io.hops.hopsworks.persistence.entity.jupyter.JupyterSettings; import io.hops.hopsworks.persistence.entity.project.Project; diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index 69bffd439e..8ff003faaf 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -838,6 +838,7 @@ public List> addService(Project project, ProjectServiceEnum service, U case AIRFLOW: addServiceDataset(project, user, Settings.ServiceDataset.AIRFLOW, dfso, udfso, Provenance.getDatasetProvCore(projectProvCore, Provenance.MLType.DATASET)); + addAirflowUser(project); airflowController.grantAirflowPermissions(project, udfso); break; } @@ -915,6 +916,17 @@ public Future addOnlineFsUser(Project projec return addServiceUser(project, OnlineFeaturestoreController.ONLINEFS_USERNAME, ProjectRoleTypes.DATA_SCIENTIST); } + public void addAirflowUser(Project project) { + Users airflowUser = userFacade.findByUsername(settings.getAirflowUser()); + ProjectTeamPK stp = new ProjectTeamPK(project.getId(), airflowUser.getEmail()); + ProjectTeam st = new ProjectTeam(stp); + st.setTeamRole(ProjectRoleTypes.DATA_SCIENTIST.getRole()); + st.setTimestamp(new Date()); + st.setUser(airflowUser); + st.setProject(project); + projectTeamFacade.persistProjectTeam(st); + } + private Future addServiceServing(Project project, Users user, DistributedFileSystemOps dfso, DistributedFileSystemOps udfso, ProvTypeDTO datasetProvCore) throws ProjectException, DatasetException, HopsSecurityException, @@ -962,6 +974,7 @@ private Future addServiceUser(Project projec throw new HopsSecurityException(RESTCodes.SecurityErrorCode.CERT_CREATION_ERROR, Level.SEVERE, "failed adding service user to project: " + project.getName() + "owner: " + username, e.getMessage(), e); } + // trigger project team role add handlers ProjectTeamRoleHandler.runProjectTeamRoleAddMembersHandlers(projectTeamRoleHandlers, project, Collections.singletonList(serviceUser), ProjectRoleTypes.fromString(st.getTeamRole()), true); diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java index af200ec0c9..ad89ffa5b9 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java @@ -25,6 +25,7 @@ import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.logging.Logger; @@ -34,6 +35,8 @@ @TransactionAttribute(TransactionAttributeType.NEVER) public class AccessController { private static final Logger LOGGER = Logger.getLogger(AccessController.class.getName()); + private static final List SERVICE_USERS = Arrays.asList("serving@hopsworks.se", "onlinefs@hopsworks.ai", + "airflow@hopsworks.ai"); public boolean hasAccess(Project userProject, Dataset targetDataset) { if(targetDataset.getProject().equals(userProject)) { @@ -63,17 +66,15 @@ public boolean hasExtendedAccess(Users user, Project project) { public Collection getExtendedMembers(Dataset dataset) { List members = dataset.getProject().getProjectTeamCollection().stream() - // filter serving and onlinefs user - .filter(pt -> !pt.getUser().getEmail().equals("serving@hopsworks.se") - && !pt.getUser().getEmail().equals("onlinefs@hopsworks.ai")) + // filter service users: onlinefs, serving, and airflow + .filter(pt -> !SERVICE_USERS.contains(pt.getUser().getEmail())) .collect(Collectors.toCollection(ArrayList::new)); //get members of projects that this dataset has been shared with List sharedDatasets = dataset.getDatasetSharedWithCollection().stream() .filter(DatasetSharedWith::getAccepted) .flatMap((sds) -> sds.getProject().getProjectTeamCollection().stream()) - // filter serving and onlinefs user - .filter(pt -> !pt.getUser().getEmail().equals("serving@hopsworks.se") - && !pt.getUser().getEmail().equals("onlinefs@hopsworks.ai")) + // filter service users: onlinefs, serving, and airflow + .filter(pt -> !SERVICE_USERS.contains(pt.getUser().getEmail())) .collect(Collectors.toCollection(ArrayList::new)); members.addAll(sharedDatasets); return members; diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWT.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWT.java similarity index 97% rename from hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWT.java rename to hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWT.java index e0062e71c0..b3e84554f3 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWT.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWT.java @@ -14,7 +14,7 @@ * If not, see . */ -package io.hops.hopsworks.persistence.entity.airflow; +package io.hops.hopsworks.persistence.entity.jupyter; import javax.persistence.EmbeddedId; import javax.persistence.Entity; diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWTID.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWTID.java similarity index 97% rename from hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWTID.java rename to hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWTID.java index 618d9d3f03..8e3f0576cf 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWTID.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWTID.java @@ -14,7 +14,7 @@ * If not, see . */ -package io.hops.hopsworks.persistence.entity.airflow; +package io.hops.hopsworks.persistence.entity.jupyter; import javax.persistence.Column; import javax.persistence.Embeddable; @@ -28,7 +28,6 @@ public class MaterializedJWTID implements Serializable { // Order is important. Always append! public enum USAGE { - AIRFLOW, JUPYTER } diff --git a/hopsworks-persistence/src/main/resources/META-INF/persistence.xml b/hopsworks-persistence/src/main/resources/META-INF/persistence.xml index 560df9b98d..004a2ebd2a 100644 --- a/hopsworks-persistence/src/main/resources/META-INF/persistence.xml +++ b/hopsworks-persistence/src/main/resources/META-INF/persistence.xml @@ -41,7 +41,7 @@ org.eclipse.persistence.jpa.PersistenceProvider jdbc/hopsworks - io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT + io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWT io.hops.hopsworks.persistence.entity.certificates.UserCerts io.hops.hopsworks.persistence.entity.command.SystemCommand io.hops.hopsworks.persistence.entity.command.SystemCommandArguments diff --git a/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js b/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js index 217743c78d..2d9488f642 100644 --- a/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js +++ b/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js @@ -769,17 +769,11 @@ angular.module('hopsWorksApp') }; self.connectToAirflow = function () { - AirflowService.storeAirflowJWT(self.projectId).then( - function (success) { - // Open airlfow - var newTab = $window.open('about:blank', '_blank'); - $http.get(getApiLocationBase() + "/airflow").then ( function (response) { + // Open airlfow + var newTab = $window.open('about:blank', '_blank'); + $http.get(getApiLocationBase() + "/airflow").then ( function (response) { newTab.location.href = getApiLocationBase() + "/airflow/admin"; - }) - }, function (error) { - growl.error(error.data.errorMsg, {title: 'Error', ttl: 5000}); - } - ) + }) }; var kibanaNavVarInitKey = "hopsworks.kibana.navbar.set"; diff --git a/hopsworks-web/yo/app/scripts/services/AirflowService.js b/hopsworks-web/yo/app/scripts/services/AirflowService.js index 9448c850e7..698e294ca8 100644 --- a/hopsworks-web/yo/app/scripts/services/AirflowService.js +++ b/hopsworks-web/yo/app/scripts/services/AirflowService.js @@ -23,9 +23,6 @@ angular.module('hopsWorksApp') .factory('AirflowService', ['$http', function ($http) { return { - storeAirflowJWT: function (projectId) { - return $http.post('/api/project/' + projectId + "/airflow/jwt") - }, logout: function() { $http.get(getApiLocationBase() + '/airflow/admin/airflow/logout').then( function successCallback(response) {