Skip to content

Commit

Permalink
fix: port commits from 0.4 to main (#684)
Browse files Browse the repository at this point in the history
* fix: provide a mechanism to contact optimus server and upstream optimus server (#624)

* fix: keep things backward compatible for other deployments depending on the upgraded optimus server. (#629)

* fix: pull image if not present locally on cluster (#647)

* fix: Error handling airflow events release (#669)

* fix: handle job run input of job_run is not registered, use schedule time as fallback for execution time

* fix: handle job run input of job_run is not registered, use schedule time as fallback for execution time

* fix: test cases

* fix: improve the job event persistance failure log msg

Co-authored-by: Sandeep Bhardwaj <[email protected]>
Co-authored-by: Yash Bhardwaj <[email protected]>
  • Loading branch information
3 people authored Dec 8, 2022
1 parent 84368a7 commit 2c3e2b8
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 167 deletions.
76 changes: 65 additions & 11 deletions api/handler/v1beta1/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/google/uuid"
"github.com/odpf/salt/log"
Expand Down Expand Up @@ -95,30 +96,45 @@ func (sv *JobRunServiceServer) JobRunInput(ctx context.Context, req *pb.JobRunIn

scheduledAt := req.GetScheduledAt().AsTime()

var jobFetchError error
var jobRunSpec models.JobRunSpec
if req.JobrunId == "" {
jobRunSpec, err = sv.monitoringService.GetJobRunByScheduledAt(ctx, namespaceSpec, jobSpec, scheduledAt)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to get JobRun by ScheduledAt for job %s", err.Error(), jobSpec.Name)
jobRunSpec, jobFetchError = sv.monitoringService.GetJobRunByScheduledAt(ctx, namespaceSpec, jobSpec, scheduledAt)
if jobFetchError != nil {
sv.l.Error(status.Errorf(codes.Internal, "%s: failed to get JobRun by ScheduledAt for job %s", jobFetchError.Error(), jobSpec.Name).Error())
}
} else {
jobRunID, err := uuid.Parse(req.JobrunId)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: failed to parse uuid of job %s", err.Error(), req.JobrunId)
jobRunID, jobFetchError := uuid.Parse(req.JobrunId)
if jobFetchError != nil {
sv.l.Error(status.Errorf(codes.InvalidArgument, "%s: failed to parse uuid of job %s", jobFetchError.Error(), req.JobrunId).Error())
} else {
jobRunSpec, jobFetchError = sv.monitoringService.GetJobRunByRunID(ctx, jobRunID, jobSpec)
if jobFetchError != nil {
sv.l.Error(status.Errorf(codes.Internal, "%s: failed to get JobRun by jobRunId::%s for job %s ", jobFetchError.Error(), req.JobrunId, jobSpec.Name).Error())
}
}
jobRunSpec, err = sv.monitoringService.GetJobRunByRunID(ctx, jobRunID, jobSpec)
}

var jobRunSpecData []models.JobRunSpecData

if jobFetchError != nil || len(jobRunSpec.Data) == 0 {
// use scheduled time as execution time to avoid JobRunInput failures
jobRunSpecData, err = getJobRunSpecData(scheduledAt, scheduledAt, jobSpec)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to get JobRun by jobRunId::%s for job %s ", err.Error(), req.JobrunId, jobSpec.Name)
return nil, status.Errorf(codes.Internal, "%s: job has an invalid window Config, or wrong scheduleTime:%s %s:%s", err.Error(), scheduledAt.String(), jobSpec.Name, namespaceSpec.Name)
}
} else {
jobRunSpecData = jobRunSpec.Data
}

secrets, err := sv.secretService.GetSecrets(ctx, namespaceSpec)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to get secrets %s:%s", err.Error(), jobSpec.Name, namespaceSpec.Name)
}

jobRunInput, err := sv.jobRunInputCompiler.Compile(ctx,
namespaceSpec, secrets, jobSpec, scheduledAt,
jobRunSpec.Data, instanceType, instanceName)
jobRunSpecData, instanceType, instanceName)

if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to compile instance of job %s", err.Error(), jobSpec.Name)
Expand All @@ -131,6 +147,40 @@ func (sv *JobRunServiceServer) JobRunInput(ctx context.Context, req *pb.JobRunIn
}, nil
}

func getJobRunSpecData(executedAt time.Time, scheduledAt time.Time, jobSpec models.JobSpec) ([]models.JobRunSpecData, error) {
startTime, err := jobSpec.Task.Window.GetStartTime(scheduledAt)
if err != nil {
return nil, err
}
endTime, err := jobSpec.Task.Window.GetEndTime(scheduledAt)
if err != nil {
return nil, err
}
jobRunSpecData := []models.JobRunSpecData{
{
Name: models.ConfigKeyExecutionTime,
Value: executedAt.Format(models.InstanceScheduledAtTimeLayout),
Type: models.InstanceDataTypeEnv,
},
{
Name: models.ConfigKeyDstart,
Value: startTime.Format(models.InstanceScheduledAtTimeLayout),
Type: models.InstanceDataTypeEnv,
},
{
Name: models.ConfigKeyDend,
Value: endTime.Format(models.InstanceScheduledAtTimeLayout),
Type: models.InstanceDataTypeEnv,
},
{
Name: models.ConfigKeyDestination,
Value: jobSpec.ResourceDestination,
Type: models.InstanceDataTypeEnv,
},
}
return jobRunSpecData, nil
}

func (sv *JobRunServiceServer) JobStatus(ctx context.Context, req *pb.JobStatusRequest) (*pb.JobStatusResponse, error) {
projSpec, err := sv.projectService.Get(ctx, req.GetProjectName())
if err != nil {
Expand Down Expand Up @@ -200,11 +250,15 @@ func (sv *JobRunServiceServer) JobRun(ctx context.Context, req *pb.JobRunRequest
}

func (*JobRunServiceServer) GetWindow(_ context.Context, req *pb.GetWindowRequest) (*pb.GetWindowResponse, error) {
// TODO the default version to be deprecated & made mandatory in future releases
version := 1
if err := req.GetScheduledAt().CheckValid(); err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to parse schedule time %s", err.Error(), req.GetScheduledAt())
}

window, err := models.NewWindow(int(req.Version), req.GetTruncateTo(), req.GetOffset(), req.GetSize())
if req.Version != 0 {
version = int(req.Version)
}
window, err := models.NewWindow(version, req.GetTruncateTo(), req.GetOffset(), req.GetSize())
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 2c3e2b8

Please sign in to comment.