diff --git a/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb b/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb
index 9b2db6918d..6c4ded01b0 100644
--- a/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb
+++ b/hopsworks-IT/src/test/ruby/spec/airflow_spec.rb
@@ -45,6 +45,12 @@
with_valid_project
end
+ it 'should add airflow user to project' do
+ get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/projectMembers"
+ airflow_member = json_body.detect { |e| e[:user][:email] == "airflow@hopsworks.ai" }
+ expect(airflow_member).not_to be_nil
+ end
+
it "should be able to compose DAG" do
get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/airflow/secretDir"
expect_status_details(200)
diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java
index e13e1f77e3..1eca909852 100644
--- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java
+++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/airflow/AirflowService.java
@@ -21,7 +21,6 @@
import io.hops.hopsworks.api.jwt.JWTHelper;
import io.hops.hopsworks.common.airflow.AirflowController;
import io.hops.hopsworks.common.airflow.AirflowDagDTO;
-import io.hops.hopsworks.common.airflow.AirflowJWTManager;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.exceptions.AirflowException;
import io.hops.hopsworks.jwt.annotation.JWTRequired;
@@ -34,6 +33,7 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.context.RequestScoped;
+import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -51,49 +51,38 @@ public class AirflowService {
@EJB
private JWTHelper jwtHelper;
@EJB
- private AirflowJWTManager airflowJWTManager;
- @EJB
private AirflowController airflowController;
private Integer projectId;
// No @EJB annotation for Project, it's injected explicitly in ProjectService.
private Project project;
+
// Audience for Airflow JWTs
private static final String[] JWT_AUDIENCE = new String[]{Audience.API};
-
+
public AirflowService() {
}
-
+
public void setProjectId(Integer projectId) {
this.projectId = projectId;
this.project = this.projectFacade.find(projectId);
}
-
+
public Integer getProjectId() {
return projectId;
}
- @POST
- @Path("/jwt")
- @Produces(MediaType.APPLICATION_JSON)
- @AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
- @JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"})
- @ApiOperation(value = "Generate a JWT for Airflow usage and store it in project's secret directory in Airflow")
- public Response storeAirflowJWT(@Context SecurityContext sc) throws AirflowException {
- Users user = jwtHelper.getUserPrincipal(sc);
- airflowJWTManager.prepareSecurityMaterial(user, project, JWT_AUDIENCE);
- return Response.noContent().build();
- }
-
@POST
@Path("/dag")
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
- @JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"})
+ @JWTRequired(acceptedTokens = {Audience.API}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
@ApiOperation(value = "Generate an Airflow Python DAG file from a DAG definition")
- public Response composeDAG(AirflowDagDTO dagDefinition, @Context SecurityContext sc) throws AirflowException {
+ public Response composeDAG(AirflowDagDTO dagDefinition,
+ @Context HttpServletRequest req,
+ @Context SecurityContext sc) throws AirflowException {
Users user = jwtHelper.getUserPrincipal(sc);
airflowController.composeDAG(project, user, dagDefinition);
return Response.ok().build();
}
-}
+}
\ No newline at end of file
diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java
index 50a5033e36..843406e65d 100644
--- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java
+++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/JobsResource.java
@@ -112,7 +112,7 @@ public JobsResource setProject(Integer projectId) {
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
- allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
+ allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
public Response getAll(
@BeanParam Pagination pagination,
@@ -136,7 +136,7 @@ public Response getAll(
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
- allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
+ allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
public Response getJob(@PathParam("name") String name,
@BeanParam JobsBeanParam jobsBeanParam,
diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java
index 3389a1220b..b66fec7a88 100644
--- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java
+++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/jobs/executions/ExecutionsResource.java
@@ -25,6 +25,7 @@
import io.hops.hopsworks.common.api.ResourceRequest;
import io.hops.hopsworks.common.jobs.JobLogDTO;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
+import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
@@ -67,6 +68,8 @@ public class ExecutionsResource {
private ExecutionController executionController;
@EJB
private ExecutionsBuilder executionsBuilder;
+ @EJB
+ private Settings settings;
@EJB
@@ -85,7 +88,8 @@ public ExecutionsResource setJob(Jobs job) {
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
- @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
+ @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER",
+ "AGENT"})
public Response getExecutions(
@BeanParam Pagination pagination,
@BeanParam ExecutionsBeanParam executionsBeanParam,
@@ -109,7 +113,8 @@ public Response getExecutions(
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
- @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
+ @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER",
+ "AGENT"})
public Response getExecution(@ApiParam(value = "execution id", required = true) @PathParam("id") Integer id,
@BeanParam ExecutionsBeanParam executionsBeanParam,
@Context UriInfo uriInfo,
@@ -152,7 +157,8 @@ public Response stopExecution(
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
- @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
+ @ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER",
+ "AGENT"})
public Response startExecution(
@ApiParam(value = "Arguments for executing the job") String args,
@Context SecurityContext sc,
@@ -160,6 +166,11 @@ public Response startExecution(
Users user = jWTHelper.getUserPrincipal(sc);
+ // run job as job owner if user is airflow
+ if (user.getUsername().equals(settings.getAirflowUser())) {
+ user = job.getCreator();
+ }
+
Execution exec;
if(!Strings.isNullOrEmpty(job.getJobConfig().getDefaultArgs()) && Strings.isNullOrEmpty(args)) {
exec = executionController.start(job, job.getJobConfig().getDefaultArgs(), user);
diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java
index 2236d295a1..87993d6631 100644
--- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java
+++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java
@@ -259,7 +259,7 @@ public Response getAllProjects(@Context HttpServletRequest req, @Context Securit
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.PROJECT},
- allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
+ allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@Produces(MediaType.APPLICATION_JSON)
public Response getProjectByName(@PathParam("projectName") String projectName,
@@ -272,7 +272,7 @@ public Response getProjectByName(@PathParam("projectName") String projectName,
@GET
@Path("/asShared/getProjectInfo/{projectName}")
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
- @ApiKeyRequired( acceptedScopes = {ApiScope.PROJECT}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
+ @ApiKeyRequired( acceptedScopes = {ApiScope.PROJECT}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT"})
@Produces(MediaType.APPLICATION_JSON)
public Response getProjectByNameAsShared(@PathParam("projectName") String projectName,
@Context HttpServletRequest req, @Context SecurityContext sc) throws ProjectException, GenericException {
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java
deleted file mode 100644
index 34e79832c5..0000000000
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/airflow/AirflowJWTManager.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * This file is part of Hopsworks
- * Copyright (C) 2019, Logical Clocks AB. All rights reserved
- *
- * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
- * the GNU Affero General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License along with this program.
- * If not, see .
- */
-
-package io.hops.hopsworks.common.airflow;
-
-import com.auth0.jwt.exceptions.JWTDecodeException;
-import com.auth0.jwt.interfaces.DecodedJWT;
-import io.hops.hopsworks.common.dao.airflow.AirflowDag;
-import io.hops.hopsworks.common.dao.airflow.AirflowDagFacade;
-import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT;
-import io.hops.hopsworks.common.dao.airflow.MaterializedJWTFacade;
-import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWTID;
-import io.hops.hopsworks.persistence.entity.project.Project;
-import io.hops.hopsworks.common.dao.project.ProjectFacade;
-import io.hops.hopsworks.persistence.entity.user.BbcGroup;
-import io.hops.hopsworks.common.dao.user.UserFacade;
-import io.hops.hopsworks.persistence.entity.user.Users;
-import io.hops.hopsworks.common.hdfs.HdfsUsersController;
-import io.hops.hopsworks.common.security.CertificateMaterializer;
-import io.hops.hopsworks.common.util.DateUtils;
-import io.hops.hopsworks.common.util.Settings;
-import io.hops.hopsworks.exceptions.AirflowException;
-import io.hops.hopsworks.jwt.Constants;
-import io.hops.hopsworks.restutils.RESTCodes;
-import io.hops.hopsworks.jwt.JWTController;
-import io.hops.hopsworks.jwt.SignatureAlgorithm;
-import io.hops.hopsworks.jwt.exception.InvalidationException;
-import io.hops.hopsworks.jwt.exception.JWTException;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FileUtils;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import javax.ejb.AccessTimeout;
-import javax.ejb.DependsOn;
-import javax.ejb.EJB;
-import javax.ejb.Lock;
-import javax.ejb.LockType;
-import javax.ejb.Singleton;
-import javax.ejb.Startup;
-import javax.ejb.Timeout;
-import javax.ejb.Timer;
-import javax.ejb.TimerConfig;
-import javax.ejb.TimerService;
-import javax.ejb.TransactionAttribute;
-import javax.ejb.TransactionAttributeType;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.nio.file.LinkOption;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.attribute.GroupPrincipal;
-import java.nio.file.attribute.PosixFileAttributeView;
-import java.nio.file.attribute.PosixFilePermission;
-import java.security.GeneralSecurityException;
-import java.sql.SQLException;
-import java.time.LocalDateTime;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-@Singleton
-@Startup
-@TransactionAttribute(TransactionAttributeType.NEVER)
-@DependsOn("Settings")
-public class AirflowJWTManager {
- private final static Logger LOG = Logger.getLogger(AirflowJWTManager.class.getName());
-
- private final static String TOKEN_FILE_SUFFIX = ".jwt";
- private final static Set TOKEN_FILE_PERMISSIONS = new HashSet<>(5);
- static {
- TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.OWNER_READ);
- TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.OWNER_WRITE);
- TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.OWNER_EXECUTE);
-
- TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.GROUP_READ);
- TOKEN_FILE_PERMISSIONS.add(PosixFilePermission.GROUP_EXECUTE);
- }
-
- private final TreeSet airflowJWTs = new TreeSet<>(new Comparator() {
- @Override
- public int compare(AirflowJWT t0, AirflowJWT t1) {
- if (t0.equals(t1)) {
- return 0;
- } else {
- if (t0.expiration.isBefore(t1.expiration)) {
- return -1;
- } else if (t0.expiration.isAfter(t1.expiration)) {
- return 1;
- }
- return 0;
- }
- }
- });
-
- @EJB
- private HdfsUsersController hdfsUsersController;
- @EJB
- private Settings settings;
- @EJB
- private JWTController jwtController;
- @EJB
- private AirflowDagFacade airflowDagFacade;
- @EJB
- private CertificateMaterializer certificateMaterializer;
- @EJB
- private MaterializedJWTFacade materializedJWTFacade;
- @EJB
- private UserFacade userFacade;
- @EJB
- private ProjectFacade projectFacade;
- @Resource
- private TimerService timerService;
-
- private GroupPrincipal airflowGroup;
- private volatile boolean initialized = false;
-
- @PostConstruct
- @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
- public void init() {
- Path airflowPath = Paths.get(settings.getAirflowDir());
- // This requires airflow::install to run before hopsworks is deployed.
- if (airflowPath.toFile().isDirectory()) {
- long interval = Math.max(1000L, settings.getJWTExpLeewaySec() * 1000L / 2);
- timerService.createIntervalTimer(10L, interval, new TimerConfig("Airflow init/JWT renewal timer", false));
- }
- }
-
- private void initAirflow() {
- Path airflowPath = Paths.get(settings.getAirflowDir());
- try {
- airflowGroup = Files.getFileAttributeView(airflowPath, PosixFileAttributeView.class,
- LinkOption.NOFOLLOW_LINKS).readAttributes().group();
- try {
- recover();
- } catch (Exception ex) {
- LOG.log(Level.WARNING, "AirflowManager failed to recover, some already running workloads might be disrupted");
- }
- // This is a dummy query to initialize airflowPool metrics for Prometheus
- airflowDagFacade.getAllWithLimit(1);
- initialized = true;
- } catch (IOException | SQLException ex) {
- LOG.log(Level.SEVERE, "Failed to initialize AirflowManager", ex);
- }
- LOG.log(Level.SEVERE, "AirflowManager initialized");
- }
-
- /**
- * Recover security material for Airflow after restart. Read all active material from the database.
- * Check if JWT exists in the local filesystem and it is valid.
- * If not try to create a new one. Finally, materialize X.509 for project specific user.
- */
- private void recover() {
- LOG.log(Level.FINE, "Starting Airflow manager recovery");
- List failed2recover = new ArrayList<>();
- Project project = null;
- Users user = null;
- // Get last known state from storage
- for (MaterializedJWT material : materializedJWTFacade.findAll4Airflow()) {
- LOG.log(Level.FINEST, "Recovering material: " + material.getIdentifier().getProjectId() + " - "
- + material.getIdentifier().getUserId());
- project = projectFacade.find(material.getIdentifier().getProjectId());
- user = userFacade.find(material.getIdentifier().getUserId());
- if (project == null || user == null) {
- LOG.log(Level.WARNING, "Error while recovering Project with ID: " + material.getIdentifier().getProjectId()
- + " and User ID: " + material.getIdentifier().getUserId() + ". Project or user is null");
- failed2recover.add(material);
- continue;
- }
-
- Path tokenFile = Paths.get(getProjectSecretsDirectory(user.getUsername()).toString(),
- getTokenFileName(project.getName(), user.getUsername()));
- AirflowJWT airflowJWT;
- String token = null;
- String materialIdentifier = "Project: " + project.getName() + " - User: " + user.getUsername();
- try {
- // First try to read JWT from the filesystem. We expect most of the cases this will succeed.
- token = FileUtils.readFileToString(tokenFile.toFile(), Charset.defaultCharset());
- DecodedJWT decoded = jwtController.verifyToken(token, settings.getJWTIssuer());
- airflowJWT = new AirflowJWT(user.getUsername(), project.getId(), project.getName(),
- DateUtils.date2LocalDateTime(decoded.getExpiresAt()), user.getUid());
- airflowJWT.tokenFile = tokenFile;
- airflowJWT.token = token;
- LOG.log(Level.FINE, "Successfully read existing JWT from local filesystem for " + materialIdentifier);
- } catch (IOException | JWTException | JWTDecodeException ex) {
- // JWT does not exist in the filesystem or we cannot read them or it is not valid any longer
- // We will create a new one
- //TODO(Antonis): Not very good that audience is hardcoded, but it is not accessible from hopsworks-common
- String[] audience = new String[]{"api"};
- LocalDateTime expirationDate = DateUtils.getNow().plus(settings.getJWTLifetimeMs(), ChronoUnit.MILLIS);
- String[] roles = getUserRoles(user);
- try {
- LOG.log(Level.FINEST, "JWT for " + materialIdentifier + " does not exist in the local FS or it is not "
- + "valid any longer, creating new one...");
- Map claims = new HashMap<>(3);
- claims.put(Constants.RENEWABLE, false);
- claims.put(Constants.EXPIRY_LEEWAY, settings.getJWTExpLeewaySec());
- claims.put(Constants.ROLES, roles);
- token = jwtController.createToken(settings.getJWTSigningKeyName(), false, settings.getJWTIssuer(),
- audience, DateUtils.localDateTime2Date(expirationDate),
- DateUtils.localDateTime2Date(DateUtils.getNow()), user.getUsername(),
- claims, SignatureAlgorithm.valueOf(settings.getJWTSignatureAlg()));
- airflowJWT = new AirflowJWT(user.getUsername(), project.getId(), project.getName(),
- expirationDate, user.getUid());
- airflowJWT.tokenFile = tokenFile;
- airflowJWT.token = token;
- writeTokenToFile(airflowJWT);
- LOG.log(Level.FINE, "Created new JWT for " + materialIdentifier + " and flushed to local FS");
- } catch (IOException ex1) {
- // Managed to create token but failed to write
- LOG.log(Level.WARNING, "Could not write to local FS new JWT for recovered material " + materialIdentifier
- + ". We will invalidate it and won't renew it.", ex1);
- if (token != null) {
- try {
- LOG.log(Level.FINE, "Failed to write JWT for " + materialIdentifier + ". Invalidating it...");
- jwtController.invalidate(token);
- } catch (InvalidationException ex2) {
- // Not much we can do about it
- }
- }
- failed2recover.add(material);
- continue;
- } catch (GeneralSecurityException | JWTException ex1) {
- LOG.log(Level.WARNING, "Tried to recover JWT for " + materialIdentifier + " but we failed. Giving up... " +
- "JWT will not be available for Airflow DAGs", ex1);
- // Initial token is invalid and could not create new. Give up
- failed2recover.add(material);
- continue;
- }
- }
-
- // If everything went fine with JWT, proceed with X.509
- try {
- LOG.log(Level.FINEST, "Materializing X.509 for " + materialIdentifier);
- certificateMaterializer.materializeCertificatesLocalCustomDir(user.getUsername(), project.getName(),
- getProjectSecretsDirectory(user.getUsername()).toString());
- LOG.log(Level.FINE, "Materialized X.509 for " + materialIdentifier);
- airflowJWTs.add(airflowJWT);
- } catch (IOException ex) {
- LOG.log(Level.WARNING, "Could not materialize X.509 for " + materialIdentifier
- + " Invalidating JWT and deleting from FS. JWT and X.509 will not be available for Airflow DAGs.", ex);
- // Could not materialize X.509
- if (token != null) {
- try {
- LOG.log(Level.FINE, "Failed to materialize X.509 for " + materialIdentifier
- + " Invalidating JWT and deleting it from local FS.");
- jwtController.invalidate(token);
- FileUtils.deleteDirectory(getProjectSecretsDirectory(user.getUsername()).toFile());
- } catch (InvalidationException | IOException ex1) {
- // Not much we can do about it
- }
- }
- failed2recover.add(material);
- }
- }
-
- // Remove failed material from persistent storage
- for (MaterializedJWT failed : failed2recover) {
- materializedJWTFacade.delete(failed.getIdentifier());
- }
- }
-
- private String[] getUserRoles(Users p) {
- Collection groupList = p.getBbcGroupCollection();
- String[] roles = new String[groupList.size()];
- int idx = 0;
- for (BbcGroup g : groupList) {
- roles[idx] = g.getGroupName();
- idx++;
- }
- return roles;
- }
-
- private void isInitialized() throws AirflowException {
- if (!initialized) {
- throw new AirflowException(RESTCodes.AirflowErrorCode.AIRFLOW_MANAGER_UNINITIALIZED, Level.WARNING,
- "AirflowManager is not initialized",
- "AirflowManager failed to initialize or Airflow is not deployed");
- }
- }
-
- @Lock(LockType.READ)
- @AccessTimeout(value = 1, unit = TimeUnit.SECONDS)
- public void prepareSecurityMaterial(Users user, Project project, String[] audience) throws AirflowException {
- isInitialized();
- MaterializedJWTID materialID = new MaterializedJWTID(project.getId(), user.getUid(),
- MaterializedJWTID.USAGE.AIRFLOW);
- if (!materializedJWTFacade.exists(materialID)) {
- LocalDateTime expirationDate = DateUtils.getNow().plus(settings.getJWTLifetimeMs(), ChronoUnit.MILLIS);
- AirflowJWT airflowJWT = new AirflowJWT(user.getUsername(), project.getId(), project.getName(), expirationDate,
- user.getUid());
- try {
- String[] roles = getUserRoles(user);
- MaterializedJWT airflowMaterial = new MaterializedJWT(new MaterializedJWTID(project.getId(), user.getUid(),
- MaterializedJWTID.USAGE.AIRFLOW));
- materializedJWTFacade.persist(airflowMaterial);
- Map claims = new HashMap<>(3);
- claims.put(Constants.RENEWABLE, false);
- claims.put(Constants.EXPIRY_LEEWAY, settings.getJWTExpLeewaySec());
- claims.put(Constants.ROLES, roles);
- String token = jwtController.createToken(settings.getJWTSigningKeyName(), false, settings.getJWTIssuer(),
- audience, DateUtils.localDateTime2Date(expirationDate),
- DateUtils.localDateTime2Date(DateUtils.getNow()), user.getUsername(),
- claims, SignatureAlgorithm.valueOf(settings.getJWTSignatureAlg()));
- String projectAirflowDir = getProjectSecretsDirectory(user.getUsername()).toString();
- airflowJWT.tokenFile = Paths.get(projectAirflowDir, getTokenFileName(project.getName(), user.getUsername()));
-
- airflowJWT.token = token;
- writeTokenToFile(airflowJWT);
- certificateMaterializer.materializeCertificatesLocalCustomDir(user.getUsername(), project.getName(),
- projectAirflowDir);
- airflowJWTs.add(airflowJWT);
- } catch (GeneralSecurityException | JWTException ex) {
- deleteAirflowMaterial(materialID);
- throw new AirflowException(RESTCodes.AirflowErrorCode.JWT_NOT_CREATED, Level.SEVERE,
- "Could not generate Airflow JWT for user " + user.getUsername(), ex.getMessage(), ex);
- } catch (IOException ex) {
- LOG.log(Level.WARNING, "Could not write Airflow JWT for user " + hdfsUsersController
- .getHdfsUserName(project, user), ex);
- deleteAirflowMaterial(materialID);
- try {
- jwtController.invalidate(airflowJWT.token);
- } catch (InvalidationException invEx) {
- LOG.log(Level.FINE, "Could not invalidate Airflow JWT. Skipping...", ex);
- }
- throw new AirflowException(RESTCodes.AirflowErrorCode.JWT_NOT_STORED, Level.SEVERE,
- "Could not store Airflow JWT for user " + hdfsUsersController.getHdfsUserName(project, user),
- ex.getMessage(), ex);
- }
- }
- }
-
- /**
- * Timer bean to periodically (JWT expiration lee way / 2) (a) clean stale JWT and X.509 material for Airflow
- * and (b) renew used JWTs.
- *
- * a. Iterate all the material in memory and clean those that don't have any entry in the database, project has been
- * deleted or those that project exist but user does not own any non-paused DAG in Airflow.
- *
- * b. For a valid JWT, renew it if the time has come (after expiration time and before expiration + expLeeWay)
- *
- * @param timer
- */
- @Lock(LockType.WRITE)
- @AccessTimeout(value = 500)
- @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
- @Timeout
- public void monitorSecurityMaterial(Timer timer) {
- if (!initialized) {
- initAirflow();
- return;
- }
- try {
- LocalDateTime now = DateUtils.getNow();
- // Clean unused token files and X.509 certificates
- cleanStaleSecurityMaterial();
-
- // Renew them
- Set newTokens2Add = new HashSet<>();
- Iterator airflowJWTIt = airflowJWTs.iterator();
- while (airflowJWTIt.hasNext()) {
- AirflowJWT airflowJWT = airflowJWTIt.next();
- // Set is sorted by expiration date
- // If first does not need to be renewed, neither do the rest
- if (airflowJWT.maybeRenew(now)) {
- try {
- LocalDateTime expirationDateTime = now.plus(settings.getJWTLifetimeMs(), ChronoUnit.MILLIS);
- Date expirationDate = DateUtils.localDateTime2Date(expirationDateTime);
- String token = jwtController.renewToken(airflowJWT.token, expirationDate,
- DateUtils.localDateTime2Date(DateUtils.getNow()), true, new HashMap<>(3));
-
- AirflowJWT renewedJWT = new AirflowJWT(airflowJWT.username, airflowJWT.projectId, airflowJWT.projectName,
- expirationDateTime, airflowJWT.uid);
- renewedJWT.tokenFile = airflowJWT.tokenFile;
- renewedJWT.token = token;
-
- airflowJWTIt.remove();
- writeTokenToFile(renewedJWT);
- newTokens2Add.add(renewedJWT);
- } catch (JWTException ex) {
- LOG.log(Level.WARNING, "Could not renew Airflow JWT for " + airflowJWT, ex);
- } catch (IOException ex) {
- LOG.log(Level.WARNING, "Could not write renewed Airflow JWT for " + airflowJWT, ex);
- try {
- jwtController.invalidate(airflowJWT.token);
- } catch (InvalidationException iex) {
- LOG.log(Level.FINE, "Could not invalidate Airflow JWT. SKipping...");
- }
- } catch (Exception ex) {
- LOG.log(Level.SEVERE, "Generic error while renewing Airflow JWTs", ex);
- }
- } else {
- break;
- }
- }
- airflowJWTs.addAll(newTokens2Add);
- } catch (Exception e) {
- LOG.log(Level.SEVERE, "Got an exception while renewing/invalidating airflow jwt token", e);
- }
- }
-
- private Path getProjectSecretsDirectory(String username) {
- return Paths.get(settings.getAirflowDir(), "secrets", generateOwnerSecret(username));
- }
-
- private String generateOwnerSecret(String username) {
- return DigestUtils.sha256Hex(username);
- }
-
- private void writeTokenToFile(AirflowJWT airflowJWT) throws IOException {
- Path parent = airflowJWT.tokenFile.getParent();
- if (!parent.toFile().exists()) {
- parent.toFile().mkdirs();
- Files.setPosixFilePermissions(parent, TOKEN_FILE_PERMISSIONS);
- Files.getFileAttributeView(parent, PosixFileAttributeView.class,
- LinkOption.NOFOLLOW_LINKS).setGroup(airflowGroup);
- }
- FileUtils.writeStringToFile(airflowJWT.tokenFile.toFile(), airflowJWT.token);
- Files.setPosixFilePermissions(airflowJWT.tokenFile, TOKEN_FILE_PERMISSIONS);
- Files.getFileAttributeView(airflowJWT.tokenFile, PosixFileAttributeView.class,
- LinkOption.NOFOLLOW_LINKS).setGroup(airflowGroup);
- }
-
- private void deleteAirflowMaterial(MaterializedJWTID identifier) {
- materializedJWTFacade.delete(identifier);
- }
-
- private String getTokenFileName(String projectName, String username) {
- return projectName + "__" + username + TOKEN_FILE_SUFFIX;
- }
-
- private boolean deleteDirectoryIfEmpty(Path directory) throws IOException {
- File directoryFile = directory.toFile();
- File[] content = directoryFile.listFiles();
- if (content != null && content.length == 0) {
- FileUtils.deleteDirectory(directoryFile);
- return true;
- }
- return false;
- }
-
- private void cleanStaleSecurityMaterial() {
- Iterator airflowJWTsIt = airflowJWTs.iterator();
- while (airflowJWTsIt.hasNext()) {
- AirflowJWT nextElement = airflowJWTsIt.next();
- try {
- MaterializedJWTID materialId = new MaterializedJWTID(nextElement.projectId, nextElement.uid,
- MaterializedJWTID.USAGE.AIRFLOW);
- MaterializedJWT airflowMaterial = materializedJWTFacade.findById(materialId);
- boolean shouldDelete = true;
-
- if (airflowMaterial != null) {
- List ownedDags = airflowDagFacade.filterByOwner(nextElement.username);
- for (AirflowDag dag : ownedDags) {
- if (!dag.getPaused()) {
- shouldDelete = false;
- break;
- }
- }
- }
-
- if (shouldDelete) {
- certificateMaterializer.removeCertificatesLocalCustomDir(nextElement.username, nextElement.projectName,
- getProjectSecretsDirectory(nextElement.username).toString());
-
- FileUtils.deleteQuietly(nextElement.tokenFile.toFile());
- airflowJWTsIt.remove();
- if (airflowMaterial != null) {
- deleteAirflowMaterial(materialId);
- }
- deleteDirectoryIfEmpty(nextElement.tokenFile.getParent());
- }
- } catch (Exception ex) {
- // Catch everything here. We don't want the timer thread to get killed (expunging timer)
- // Be on the safe side and renew the token
- LOG.log(Level.WARNING, "Could not determine if token " + nextElement + " is stale. It will be renewed!", ex);
- }
- }
- }
-
- private class AirflowJWT {
- private final String username;
- private final Integer projectId;
- private final String projectName;
- private final LocalDateTime expiration;
- private final Integer uid;
-
- private String token;
- private Path tokenFile;
-
- private AirflowJWT(String username, Integer projectId, String projectName, LocalDateTime expiration, Integer uid) {
- this.username = username;
- this.projectId = projectId;
- this.projectName = projectName;
- this.expiration = expiration;
- this.uid = uid;
- }
-
- private boolean maybeRenew(LocalDateTime now) {
- return now.isAfter(expiration) || now.isEqual(expiration);
- }
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 31 * result + uid;
- result = 31 * result + projectId;
- return result;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o instanceof AirflowJWT) {
- AirflowJWT other = (AirflowJWT) o;
- return uid.equals(other.uid) && projectId.equals(other.projectId);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "Airflow JWT - Project ID: " + projectId + " User ID: " + uid;
- }
- }
-}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java
deleted file mode 100644
index 660cec5266..0000000000
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDag.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * This file is part of Hopsworks
- * Copyright (C) 2018, Logical Clocks AB. All rights reserved
- *
- * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
- * the GNU Affero General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License along with this program.
- * If not, see .
- */
-
-package io.hops.hopsworks.common.dao.airflow;
-
-public class AirflowDag {
-
- private final String id;
- private final Boolean paused;
-
- public AirflowDag(String id, Boolean paused) {
- this.id = id;
- this.paused = paused;
- }
-
- public String getId() {
- return id;
- }
- public Boolean getPaused() {
- return paused;
- }
-
-}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java
deleted file mode 100644
index 504b29fb8a..0000000000
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/AirflowDagFacade.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * This file is part of Hopsworks
- * Copyright (C) 2018, Logical Clocks AB. All rights reserved
- *
- * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
- * the GNU Affero General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License along with this program.
- * If not, see .
- */
-
-package io.hops.hopsworks.common.dao.airflow;
-
-import javax.annotation.Resource;
-import javax.ejb.Stateless;
-import javax.ejb.TransactionAttribute;
-import javax.ejb.TransactionAttributeType;
-import javax.sql.DataSource;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * This is special case of a facade. It does not use the EntityManager.
- *
- * Installation of Airflow is optional. This means that we can't have the
- * airflow persistence unit statically in persistence.xml file as the Connection
- * Pool and JDBC Resource won't exist in Glassfish.
- *
- * All persistence units in persistence.xml *must* exist during application
- * deployment. Otherwise the deployment fails.
- *
- * To tackle this dynamic installation, we skip adding airflow PU in persistence.xml
- * and we inject the JDBC Resource programmatically.
- *
- * The method in this EJB will only be invoked when Airflow is installed, so it is
- * safe to inject jdbc/airflow DataSource.
- *
- * DO NOT invoke the DataSource directly!
- *
- * Look on other facades for examples on how to properly access the database.
- */
-
-@Stateless
-@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
-public class AirflowDagFacade {
-
- private static final Logger LOGGER = Logger.getLogger(AirflowDagFacade.class.getName());
- private static final String DAGS_STATUS_QUERY = "SELECT dag_id, is_paused FROM airflow.dag WHERE owners = ?";
- private static final String GET_ALL_DAGS_WITH_LIMIT_QUERY = "SELECT dag_id, is_paused FROM airflow.dag LIMIT ?";
-
- @Resource(name = "jdbc/airflow")
- private DataSource airflowDataSource;
-
- public List filterByOwner(String owner) throws IOException, SQLException {
- if (owner == null || owner.isEmpty()) {
- throw new IOException("Airflow DAG owner cannot be null or empty");
- }
- List dags = new ArrayList<>();
- try (Connection connection = airflowDataSource.getConnection();
- PreparedStatement stmt = connection.prepareStatement(DAGS_STATUS_QUERY)) {
- stmt.setString(1, owner);
- try (ResultSet dagsRS = stmt.executeQuery()) {
- while (dagsRS.next()) {
- AirflowDag dag = new AirflowDag(dagsRS.getString("dag_id"),
- dagsRS.getBoolean("is_paused"));
- dags.add(dag);
- }
- return dags;
- }
- }
- }
-
- public List getAllWithLimit(Integer limit) throws SQLException {
- List dags = new ArrayList<>();
- try (Connection connection = airflowDataSource.getConnection();
- PreparedStatement stmt = connection.prepareStatement(GET_ALL_DAGS_WITH_LIMIT_QUERY)) {
- stmt.setInt(1, limit);
- try (ResultSet dagsRS = stmt.executeQuery()) {
- while (dagsRS.next()) {
- AirflowDag dag = new AirflowDag(dagsRS.getString("dag_id"),
- dagsRS.getBoolean("is_paused"));
- dags.add(dag);
- }
- return dags;
- }
- }
- }
-}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/MaterializedJWTFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/MaterializedJWTFacade.java
similarity index 75%
rename from hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/MaterializedJWTFacade.java
rename to hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/MaterializedJWTFacade.java
index a9577a226e..59978ee32b 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/airflow/MaterializedJWTFacade.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/jupyter/MaterializedJWTFacade.java
@@ -14,11 +14,11 @@
* If not, see .
*/
-package io.hops.hopsworks.common.dao.airflow;
+package io.hops.hopsworks.common.dao.jupyter;
-import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT;
-import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWTID;
+import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWT;
+import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWTID;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
@@ -42,14 +42,14 @@ public MaterializedJWT findById(MaterializedJWTID identifier) {
return entityManager.find(MaterializedJWT.class, identifier);
}
- public void persist(MaterializedJWT airflowMaterial) {
- entityManager.persist(airflowMaterial);
+ public void persist(MaterializedJWT jupyterMaterial) {
+ entityManager.persist(jupyterMaterial);
}
public void delete(MaterializedJWTID identifier) {
- MaterializedJWT airflowMaterial = findById(identifier);
- if (airflowMaterial != null) {
- entityManager.remove(airflowMaterial);
+ MaterializedJWT jupyterMaterial = findById(identifier);
+ if (jupyterMaterial != null) {
+ entityManager.remove(jupyterMaterial);
}
}
@@ -58,13 +58,7 @@ public List findAll() {
MaterializedJWT.class);
return query.getResultList();
}
-
- public List findAll4Airflow() {
- return entityManager.createNamedQuery("MaterializedJWT.findByUsage", MaterializedJWT.class)
- .setParameter("usage", MaterializedJWTID.USAGE.AIRFLOW)
- .getResultList();
- }
-
+
public List findAll4Jupyter() {
return entityManager.createNamedQuery("MaterializedJWT.findByUsage", MaterializedJWT.class)
.setParameter("usage", MaterializedJWTID.USAGE.JUPYTER)
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java
index d609b7ba83..b4c99039a8 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/jupyter/JupyterJWTManager.java
@@ -18,7 +18,7 @@
import com.auth0.jwt.exceptions.JWTDecodeException;
import com.auth0.jwt.interfaces.DecodedJWT;
-import io.hops.hopsworks.common.dao.airflow.MaterializedJWTFacade;
+import io.hops.hopsworks.common.dao.jupyter.MaterializedJWTFacade;
import io.hops.hopsworks.common.dao.jupyter.JupyterSettingsFacade;
import io.hops.hopsworks.common.dao.jupyter.config.JupyterFacade;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
@@ -33,8 +33,8 @@
import io.hops.hopsworks.jwt.SignatureAlgorithm;
import io.hops.hopsworks.jwt.exception.InvalidationException;
import io.hops.hopsworks.jwt.exception.JWTException;
-import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT;
-import io.hops.hopsworks.persistence.entity.airflow.MaterializedJWTID;
+import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWT;
+import io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWTID;
import io.hops.hopsworks.persistence.entity.jupyter.JupyterProject;
import io.hops.hopsworks.persistence.entity.jupyter.JupyterSettings;
import io.hops.hopsworks.persistence.entity.project.Project;
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java
index 69bffd439e..8ff003faaf 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java
@@ -838,6 +838,7 @@ public List> addService(Project project, ProjectServiceEnum service, U
case AIRFLOW:
addServiceDataset(project, user, Settings.ServiceDataset.AIRFLOW, dfso, udfso,
Provenance.getDatasetProvCore(projectProvCore, Provenance.MLType.DATASET));
+ addAirflowUser(project);
airflowController.grantAirflowPermissions(project, udfso);
break;
}
@@ -915,6 +916,17 @@ public Future addOnlineFsUser(Project projec
return addServiceUser(project, OnlineFeaturestoreController.ONLINEFS_USERNAME, ProjectRoleTypes.DATA_SCIENTIST);
}
+ public void addAirflowUser(Project project) {
+ Users airflowUser = userFacade.findByUsername(settings.getAirflowUser());
+ ProjectTeamPK stp = new ProjectTeamPK(project.getId(), airflowUser.getEmail());
+ ProjectTeam st = new ProjectTeam(stp);
+ st.setTeamRole(ProjectRoleTypes.DATA_SCIENTIST.getRole());
+ st.setTimestamp(new Date());
+ st.setUser(airflowUser);
+ st.setProject(project);
+ projectTeamFacade.persistProjectTeam(st);
+ }
+
private Future addServiceServing(Project project, Users user,
DistributedFileSystemOps dfso, DistributedFileSystemOps udfso, ProvTypeDTO datasetProvCore)
throws ProjectException, DatasetException, HopsSecurityException,
@@ -962,6 +974,7 @@ private Future addServiceUser(Project projec
throw new HopsSecurityException(RESTCodes.SecurityErrorCode.CERT_CREATION_ERROR, Level.SEVERE,
"failed adding service user to project: " + project.getName() + "owner: " + username, e.getMessage(), e);
}
+
// trigger project team role add handlers
ProjectTeamRoleHandler.runProjectTeamRoleAddMembersHandlers(projectTeamRoleHandlers, project,
Collections.singletonList(serviceUser), ProjectRoleTypes.fromString(st.getTeamRole()), true);
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java
index af200ec0c9..ad89ffa5b9 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/AccessController.java
@@ -25,6 +25,7 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.logging.Logger;
@@ -34,6 +35,8 @@
@TransactionAttribute(TransactionAttributeType.NEVER)
public class AccessController {
private static final Logger LOGGER = Logger.getLogger(AccessController.class.getName());
+ private static final List SERVICE_USERS = Arrays.asList("serving@hopsworks.se", "onlinefs@hopsworks.ai",
+ "airflow@hopsworks.ai");
public boolean hasAccess(Project userProject, Dataset targetDataset) {
if(targetDataset.getProject().equals(userProject)) {
@@ -63,17 +66,15 @@ public boolean hasExtendedAccess(Users user, Project project) {
public Collection getExtendedMembers(Dataset dataset) {
List members = dataset.getProject().getProjectTeamCollection().stream()
- // filter serving and onlinefs user
- .filter(pt -> !pt.getUser().getEmail().equals("serving@hopsworks.se")
- && !pt.getUser().getEmail().equals("onlinefs@hopsworks.ai"))
+ // filter service users: onlinefs, serving, and airflow
+ .filter(pt -> !SERVICE_USERS.contains(pt.getUser().getEmail()))
.collect(Collectors.toCollection(ArrayList::new));
//get members of projects that this dataset has been shared with
List sharedDatasets = dataset.getDatasetSharedWithCollection().stream()
.filter(DatasetSharedWith::getAccepted)
.flatMap((sds) -> sds.getProject().getProjectTeamCollection().stream())
- // filter serving and onlinefs user
- .filter(pt -> !pt.getUser().getEmail().equals("serving@hopsworks.se")
- && !pt.getUser().getEmail().equals("onlinefs@hopsworks.ai"))
+ // filter service users: onlinefs, serving, and airflow
+ .filter(pt -> !SERVICE_USERS.contains(pt.getUser().getEmail()))
.collect(Collectors.toCollection(ArrayList::new));
members.addAll(sharedDatasets);
return members;
diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWT.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWT.java
similarity index 97%
rename from hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWT.java
rename to hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWT.java
index e0062e71c0..b3e84554f3 100644
--- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWT.java
+++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWT.java
@@ -14,7 +14,7 @@
* If not, see .
*/
-package io.hops.hopsworks.persistence.entity.airflow;
+package io.hops.hopsworks.persistence.entity.jupyter;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWTID.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWTID.java
similarity index 97%
rename from hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWTID.java
rename to hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWTID.java
index 618d9d3f03..8e3f0576cf 100644
--- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/airflow/MaterializedJWTID.java
+++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/jupyter/MaterializedJWTID.java
@@ -14,7 +14,7 @@
* If not, see .
*/
-package io.hops.hopsworks.persistence.entity.airflow;
+package io.hops.hopsworks.persistence.entity.jupyter;
import javax.persistence.Column;
import javax.persistence.Embeddable;
@@ -28,7 +28,6 @@ public class MaterializedJWTID implements Serializable {
// Order is important. Always append!
public enum USAGE {
- AIRFLOW,
JUPYTER
}
diff --git a/hopsworks-persistence/src/main/resources/META-INF/persistence.xml b/hopsworks-persistence/src/main/resources/META-INF/persistence.xml
index 560df9b98d..004a2ebd2a 100644
--- a/hopsworks-persistence/src/main/resources/META-INF/persistence.xml
+++ b/hopsworks-persistence/src/main/resources/META-INF/persistence.xml
@@ -41,7 +41,7 @@
org.eclipse.persistence.jpa.PersistenceProvider
jdbc/hopsworks
- io.hops.hopsworks.persistence.entity.airflow.MaterializedJWT
+ io.hops.hopsworks.persistence.entity.jupyter.MaterializedJWT
io.hops.hopsworks.persistence.entity.certificates.UserCerts
io.hops.hopsworks.persistence.entity.command.SystemCommand
io.hops.hopsworks.persistence.entity.command.SystemCommandArguments
diff --git a/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js b/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js
index 217743c78d..2d9488f642 100644
--- a/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js
+++ b/hopsworks-web/yo/app/scripts/controllers/projectCtrl.js
@@ -769,17 +769,11 @@ angular.module('hopsWorksApp')
};
self.connectToAirflow = function () {
- AirflowService.storeAirflowJWT(self.projectId).then(
- function (success) {
- // Open airlfow
- var newTab = $window.open('about:blank', '_blank');
- $http.get(getApiLocationBase() + "/airflow").then ( function (response) {
+ // Open airlfow
+ var newTab = $window.open('about:blank', '_blank');
+ $http.get(getApiLocationBase() + "/airflow").then ( function (response) {
newTab.location.href = getApiLocationBase() + "/airflow/admin";
- })
- }, function (error) {
- growl.error(error.data.errorMsg, {title: 'Error', ttl: 5000});
- }
- )
+ })
};
var kibanaNavVarInitKey = "hopsworks.kibana.navbar.set";
diff --git a/hopsworks-web/yo/app/scripts/services/AirflowService.js b/hopsworks-web/yo/app/scripts/services/AirflowService.js
index 9448c850e7..698e294ca8 100644
--- a/hopsworks-web/yo/app/scripts/services/AirflowService.js
+++ b/hopsworks-web/yo/app/scripts/services/AirflowService.js
@@ -23,9 +23,6 @@
angular.module('hopsWorksApp')
.factory('AirflowService', ['$http', function ($http) {
return {
- storeAirflowJWT: function (projectId) {
- return $http.post('/api/project/' + projectId + "/airflow/jwt")
- },
logout: function() {
$http.get(getApiLocationBase() + '/airflow/admin/airflow/logout').then(
function successCallback(response) {