Skip to content

Commit

Permalink
[HWORKS-697] Add airflow user to project (#1520) (#1408)
Browse files Browse the repository at this point in the history
  • Loading branch information
gibchikafa authored Oct 5, 2023
1 parent d2a878f commit 53737fb
Show file tree
Hide file tree
Showing 17 changed files with 73 additions and 759 deletions.
6 changes: 6 additions & 0 deletions hopsworks-IT/src/test/ruby/spec/airflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
with_valid_project
end

it 'should add airflow user to project' do
get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/projectMembers"
airflow_member = json_body.detect { |e| e[:user][:email] == "[email protected]" }
expect(airflow_member).not_to be_nil
end

it "should be able to compose DAG" do
get "#{ENV['HOPSWORKS_API']}/project/#{@project[:id]}/airflow/secretDir"
expect_status_details(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.hops.hopsworks.api.jwt.JWTHelper;
import io.hops.hopsworks.common.airflow.AirflowController;
import io.hops.hopsworks.common.airflow.AirflowDagDTO;
import io.hops.hopsworks.common.airflow.AirflowJWTManager;
import io.hops.hopsworks.common.dao.project.ProjectFacade;
import io.hops.hopsworks.exceptions.AirflowException;
import io.hops.hopsworks.jwt.annotation.JWTRequired;
Expand All @@ -34,6 +33,7 @@
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.enterprise.context.RequestScoped;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand All @@ -51,49 +51,38 @@ public class AirflowService {
@EJB
private JWTHelper jwtHelper;
@EJB
private AirflowJWTManager airflowJWTManager;
@EJB
private AirflowController airflowController;
private Integer projectId;
// No @EJB annotation for Project, it's injected explicitly in ProjectService.
private Project project;


// Audience for Airflow JWTs
private static final String[] JWT_AUDIENCE = new String[]{Audience.API};

public AirflowService() {
}

public void setProjectId(Integer projectId) {
this.projectId = projectId;
this.project = this.projectFacade.find(projectId);
}

public Integer getProjectId() {
return projectId;
}

@POST
@Path("/jwt")
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"})
@ApiOperation(value = "Generate a JWT for Airflow usage and store it in project's secret directory in Airflow")
public Response storeAirflowJWT(@Context SecurityContext sc) throws AirflowException {
Users user = jwtHelper.getUserPrincipal(sc);
airflowJWTManager.prepareSecurityMaterial(user, project, JWT_AUDIENCE);
return Response.noContent().build();
}

@POST
@Path("/dag")
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens={Audience.API}, allowedUserRoles={"HOPS_ADMIN", "HOPS_USER"})
@JWTRequired(acceptedTokens = {Audience.API}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
@ApiOperation(value = "Generate an Airflow Python DAG file from a DAG definition")
public Response composeDAG(AirflowDagDTO dagDefinition, @Context SecurityContext sc) throws AirflowException {
public Response composeDAG(AirflowDagDTO dagDefinition,
@Context HttpServletRequest req,
@Context SecurityContext sc) throws AirflowException {
Users user = jwtHelper.getUserPrincipal(sc);
airflowController.composeDAG(project, user, dagDefinition);
return Response.ok().build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public JobsResource setProject(Integer projectId) {
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
public Response getAll(
@BeanParam Pagination pagination,
Expand All @@ -136,7 +136,7 @@ public Response getAll(
@Produces(MediaType.APPLICATION_JSON)
@AllowedProjectRoles({AllowedProjectRoles.DATA_OWNER, AllowedProjectRoles.DATA_SCIENTIST})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
public Response getJob(@PathParam("name") String name,
@BeanParam JobsBeanParam jobsBeanParam,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.hops.hopsworks.common.api.ResourceRequest;
import io.hops.hopsworks.common.jobs.JobLogDTO;
import io.hops.hopsworks.common.jobs.execution.ExecutionController;
import io.hops.hopsworks.common.util.Settings;
import io.hops.hopsworks.exceptions.GenericException;
import io.hops.hopsworks.exceptions.JobException;
import io.hops.hopsworks.exceptions.ProjectException;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class ExecutionsResource {
private ExecutionController executionController;
@EJB
private ExecutionsBuilder executionsBuilder;
@EJB
private Settings settings;


@EJB
Expand All @@ -85,7 +88,8 @@ public ExecutionsResource setJob(Jobs job) {
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER",
"AGENT"})
public Response getExecutions(
@BeanParam Pagination pagination,
@BeanParam ExecutionsBeanParam executionsBeanParam,
Expand All @@ -109,7 +113,8 @@ public Response getExecutions(
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER",
"AGENT"})
public Response getExecution(@ApiParam(value = "execution id", required = true) @PathParam("id") Integer id,
@BeanParam ExecutionsBeanParam executionsBeanParam,
@Context UriInfo uriInfo,
Expand Down Expand Up @@ -152,14 +157,20 @@ public Response stopExecution(
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER",
"AGENT"})
public Response startExecution(
@ApiParam(value = "Arguments for executing the job") String args,
@Context SecurityContext sc,
@Context UriInfo uriInfo) throws JobException, GenericException, ServiceException, ProjectException {

Users user = jWTHelper.getUserPrincipal(sc);

// run job as job owner if user is airflow
if (user.getUsername().equals(settings.getAirflowUser())) {
user = job.getCreator();
}

Execution exec;
if(!Strings.isNullOrEmpty(job.getJobConfig().getDefaultArgs()) && Strings.isNullOrEmpty(args)) {
exec = executionController.start(job, job.getJobConfig().getDefaultArgs(), user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public Response getAllProjects(@Context HttpServletRequest req, @Context Securit
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
@ApiKeyRequired(acceptedScopes = {ApiScope.PROJECT},
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"})
allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER", "AGENT"})
@AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER})
@Produces(MediaType.APPLICATION_JSON)
public Response getProjectByName(@PathParam("projectName") String projectName,
Expand All @@ -272,7 +272,7 @@ public Response getProjectByName(@PathParam("projectName") String projectName,
@GET
@Path("/asShared/getProjectInfo/{projectName}")
@JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
@ApiKeyRequired( acceptedScopes = {ApiScope.PROJECT}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER"})
@ApiKeyRequired( acceptedScopes = {ApiScope.PROJECT}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT"})
@Produces(MediaType.APPLICATION_JSON)
public Response getProjectByNameAsShared(@PathParam("projectName") String projectName,
@Context HttpServletRequest req, @Context SecurityContext sc) throws ProjectException, GenericException {
Expand Down
Loading

0 comments on commit 53737fb

Please sign in to comment.