Skip to content

Commit

Permalink
Refactor model.Job and /submit payload + make canary tests work loc…
Browse files Browse the repository at this point in the history
…ally (#1430)
  • Loading branch information
enricorotundo authored Dec 8, 2022
1 parent 64ebc8b commit 7a107b3
Show file tree
Hide file tree
Showing 69 changed files with 884 additions and 681 deletions.
36 changes: 18 additions & 18 deletions cmd/bacalhau/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,41 +167,41 @@ func create(cmd *cobra.Command, cmdArgs []string, OC *CreateOptions) error { //n

// Warn on fields with data that will be ignored
var unusedFieldList []string
if j.ClientID != "" {
if j.Metadata.ClientID != "" {
unusedFieldList = append(unusedFieldList, "ClientID")
j.ClientID = ""
j.Metadata.ClientID = ""
}
if !reflect.DeepEqual(j.CreatedAt, time.Time{}) {
if !reflect.DeepEqual(j.Metadata.CreatedAt, time.Time{}) {
unusedFieldList = append(unusedFieldList, "CreatedAt")
j.CreatedAt = time.Time{}
j.Metadata.CreatedAt = time.Time{}
}
if !reflect.DeepEqual(j.ExecutionPlan, model.JobExecutionPlan{}) {
if !reflect.DeepEqual(j.Spec.ExecutionPlan, model.JobExecutionPlan{}) {
unusedFieldList = append(unusedFieldList, "Verification")
j.ExecutionPlan = model.JobExecutionPlan{}
j.Spec.ExecutionPlan = model.JobExecutionPlan{}
}
if len(j.Events) != 0 {
if len(j.Status.Events) != 0 {
unusedFieldList = append(unusedFieldList, "Events")
j.Events = nil
j.Status.Events = nil
}
if j.ID != "" {
if j.Metadata.ID != "" {
unusedFieldList = append(unusedFieldList, "ID")
j.ID = ""
j.Metadata.ID = ""
}
if len(j.LocalEvents) != 0 {
if len(j.Status.LocalEvents) != 0 {
unusedFieldList = append(unusedFieldList, "LocalEvents")
j.LocalEvents = nil
j.Status.LocalEvents = nil
}
if j.RequesterNodeID != "" {
if j.Status.Requester.RequesterNodeID != "" {
unusedFieldList = append(unusedFieldList, "RequesterNodeID")
j.RequesterNodeID = ""
j.Status.Requester.RequesterNodeID = ""
}
if len(j.RequesterPublicKey) != 0 {
if len(j.Status.Requester.RequesterPublicKey) != 0 {
unusedFieldList = append(unusedFieldList, "RequesterPublicKey")
j.RequesterPublicKey = nil
j.Status.Requester.RequesterPublicKey = nil
}
if !reflect.DeepEqual(j.State, model.JobState{}) {
if !reflect.DeepEqual(j.Status.State, model.JobState{}) {
unusedFieldList = append(unusedFieldList, "State")
j.State = model.JobState{}
j.Status.State = model.JobState{}
}

// Warn on fields with data that will be ignored
Expand Down
2 changes: 1 addition & 1 deletion cmd/bacalhau/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *CreateSuite) TestCreateFromStdin() {
_, out, err = ExecuteTestCobraCommand(s.T(), "describe",
"--api-host", host,
"--api-port", port,
job.ID,
job.Metadata.ID,
)

require.NoError(s.T(), err, "Error describing job.")
Expand Down
22 changes: 11 additions & 11 deletions cmd/bacalhau/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,38 +108,38 @@ func describe(cmd *cobra.Command, cmdArgs []string, OD *DescribeOptions) error {
Fatal(cmd, "", 1)
}

shardStates, err := GetAPIClient().GetJobState(ctx, j.ID)
shardStates, err := GetAPIClient().GetJobState(ctx, j.Metadata.ID)
if err != nil {
Fatal(cmd, fmt.Sprintf("Failure retrieving job states '%s': %s\n", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Failure retrieving job states '%s': %s\n", j.Metadata.ID, err), 1)
}

jobEvents, err := GetAPIClient().GetEvents(ctx, j.ID)
jobEvents, err := GetAPIClient().GetEvents(ctx, j.Metadata.ID)
if err != nil {
Fatal(cmd, fmt.Sprintf("Failure retrieving job events '%s': %s\n", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Failure retrieving job events '%s': %s\n", j.Metadata.ID, err), 1)
}

localEvents, err := GetAPIClient().GetLocalEvents(ctx, j.ID)
localEvents, err := GetAPIClient().GetLocalEvents(ctx, j.Metadata.ID)
if err != nil {
Fatal(cmd, fmt.Sprintf("Failure retrieving job events '%s': %s\n", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Failure retrieving job events '%s': %s\n", j.Metadata.ID, err), 1)
}

jobDesc := j
jobDesc.State = shardStates
jobDesc.Status.State = shardStates

if OD.IncludeEvents {
jobDesc.Events = jobEvents
jobDesc.LocalEvents = localEvents
jobDesc.Status.Events = jobEvents
jobDesc.Status.LocalEvents = localEvents
}

b, err := model.JSONMarshalWithMax(jobDesc)
if err != nil {
Fatal(cmd, fmt.Sprintf("Failure marshaling job description '%s': %s\n", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Failure marshaling job description '%s': %s\n", j.Metadata.ID, err), 1)
}

// Convert Json to Yaml
y, err := yaml.JSONToYAML(b)
if err != nil {
Fatal(cmd, fmt.Sprintf("Able to marshal to YAML but not JSON whatttt '%s': %s\n", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Able to marshal to YAML but not JSON whatttt '%s': %s\n", j.Metadata.ID, err), 1)
}

cmd.Print(string(y))
Expand Down
18 changes: 9 additions & 9 deletions cmd/bacalhau/describe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func (suite *DescribeSuite) TestDescribeJob() {
_, out, err = ExecuteTestCobraCommand(suite.T(), "describe",
"--api-host", host,
"--api-port", port,
submittedJob.ID,
submittedJob.Metadata.ID,
)
require.NoError(suite.T(), err, "Error in describing job: %+v", err)

err = model.YAMLUnmarshalWithMax([]byte(out), returnedJob)
require.NoError(suite.T(), err, "Error in unmarshalling description: %+v", err)
require.Equal(suite.T(), submittedJob.ID, returnedJob.ID, "IDs do not match.")
require.Equal(suite.T(), submittedJob.Metadata.ID, returnedJob.Metadata.ID, "IDs do not match.")
require.Equal(suite.T(),
submittedJob.Spec.Docker.Entrypoint[0],
returnedJob.Spec.Docker.Entrypoint[0],
Expand All @@ -98,14 +98,14 @@ func (suite *DescribeSuite) TestDescribeJob() {
// Job Id in the middle
_, out, err = ExecuteTestCobraCommand(suite.T(), "describe",
"--api-host", host,
submittedJob.ID,
submittedJob.Metadata.ID,
"--api-port", port,
)

require.NoError(suite.T(), err, "Error in describing job: %+v", err)
err = model.YAMLUnmarshalWithMax([]byte(out), returnedJob)
require.NoError(suite.T(), err, "Error in unmarshalling description: %+v", err)
require.Equal(suite.T(), submittedJob.ID, returnedJob.ID, "IDs do not match.")
require.Equal(suite.T(), submittedJob.Metadata.ID, returnedJob.Metadata.ID, "IDs do not match.")
require.Equal(suite.T(),
submittedJob.Spec.Docker.Entrypoint[0],
returnedJob.Spec.Docker.Entrypoint[0],
Expand All @@ -114,14 +114,14 @@ func (suite *DescribeSuite) TestDescribeJob() {
// Short job id
_, out, err = ExecuteTestCobraCommand(suite.T(), "describe",
"--api-host", host,
submittedJob.ID[0:model.ShortIDLength],
submittedJob.Metadata.ID[0:model.ShortIDLength],
"--api-port", port,
)

require.NoError(suite.T(), err, "Error in describing job: %+v", err)
err = model.YAMLUnmarshalWithMax([]byte(out), returnedJob)
require.NoError(suite.T(), err, "Error in unmarshalling description: %+v", err)
require.Equal(suite.T(), submittedJob.ID, returnedJob.ID, "IDs do not match.")
require.Equal(suite.T(), submittedJob.Metadata.ID, returnedJob.Metadata.ID, "IDs do not match.")
require.Equal(suite.T(),
submittedJob.Spec.Docker.Entrypoint[0],
returnedJob.Spec.Docker.Entrypoint[0],
Expand Down Expand Up @@ -159,7 +159,7 @@ func (suite *DescribeSuite) TestDescribeJobIncludeEvents() {

var args []string

args = append(args, "describe", "--api-host", host, "--api-port", port, submittedJob.ID)
args = append(args, "describe", "--api-host", host, "--api-port", port, submittedJob.Metadata.ID)
if tc.includeEvents {
args = append(args, "--include-events")
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *DescribeSuite) TestDescribeJobEdgeCases() {

// If describeID is empty, should return use submitted ID. Otherwise, use describeID
if tc.describeIDEdgecase == "" {
jobID = submittedJob.ID
jobID = submittedJob.Metadata.ID
} else {
jobID = tc.describeIDEdgecase
}
Expand All @@ -240,7 +240,7 @@ func (s *DescribeSuite) TestDescribeJobEdgeCases() {

err = model.YAMLUnmarshalWithMax([]byte(out), &returnedJob)
require.NoError(s.T(), err, "Error in unmarshalling description: %+v", err)
require.Equal(s.T(), submittedJob.ID, returnedJob.ID, "IDs do not match.")
require.Equal(s.T(), submittedJob.Metadata.ID, returnedJob.Metadata.ID, "IDs do not match.")
require.Equal(s.T(),
submittedJob.Spec.Docker.Entrypoint[0],
returnedJob.Spec.Docker.Entrypoint[0],
Expand Down
6 changes: 3 additions & 3 deletions cmd/bacalhau/docker_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,10 @@ func (s *DockerRunSuite) TestRun_CreatedAt() {

j := testutils.GetJobFromTestOutput(ctx, s.T(), c, out)

require.LessOrEqual(s.T(), j.CreatedAt, time.Now(), "Created at time is not less than or equal to now.")
require.LessOrEqual(s.T(), j.Metadata.CreatedAt, time.Now(), "Created at time is not less than or equal to now.")

oldStartTime, _ := time.Parse(time.RFC3339, "2021-01-01T01:01:01+00:00")
require.GreaterOrEqual(s.T(), j.CreatedAt, oldStartTime, "Created at time is not greater or equal to 2022-01-01.")
require.GreaterOrEqual(s.T(), j.Metadata.CreatedAt, oldStartTime, "Created at time is not greater or equal to 2022-01-01.")
}()

}
Expand Down Expand Up @@ -768,7 +768,7 @@ func (s *DockerRunSuite) TestRun_Deterministic_Verifier() {
return "", err
}
j := testutils.GetJobFromTestOutput(ctx, s.T(), apiClient, out)
return j.ID, nil
return j.Metadata.ID, nil
}

// test that we must have more than one node to run the job
Expand Down
4 changes: 2 additions & 2 deletions cmd/bacalhau/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ func summarizeJob(ctx context.Context, j *model.Job, OL *ListOptions) (table.Row
resultSummary := job.ComputeResultsSummary(j)

row := table.Row{
shortenTime(OL.OutputWide, j.CreatedAt),
shortID(OL.OutputWide, j.ID),
shortenTime(OL.OutputWide, j.Metadata.CreatedAt),
shortID(OL.OutputWide, j.Metadata.ID),
shortenString(OL.OutputWide, strings.Join(jobDesc, " ")),
shortenString(OL.OutputWide, stateSummary),
shortenString(OL.OutputWide, verifiedSummary),
Expand Down
8 changes: 4 additions & 4 deletions cmd/bacalhau/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (suite *ListSuite) TestList_IdFilter() {
var err error
j := publicapi.MakeNoopJob()
j, err = c.Submit(ctx, j, nil)
jobIds = append(jobIds, shortID(false, j.ID))
jobLongIds = append(jobIds, j.ID)
jobIds = append(jobIds, shortID(false, j.Metadata.ID))
jobLongIds = append(jobIds, j.Metadata.ID)
require.NoError(suite.T(), err)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func (suite *ListSuite) TestList_IdFilter() {

require.NoError(suite.T(), err)

require.Contains(suite.T(), firstItem.ID, jobLongIds[0], "The filtered job id was not found in the response")
require.Contains(suite.T(), firstItem.Metadata.ID, jobLongIds[0], "The filtered job id was not found in the response")
require.Equal(suite.T(), 1, len(response.Jobs), "The list of jobs is not strictly filtered to the requested job id")
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func (suite *ListSuite) TestList_SortFlags() {
j := publicapi.MakeNoopJob()
j, err = c.Submit(ctx, j, nil)
require.NoError(suite.T(), err)
jobIDs = append(jobIDs, shortID(false, j.ID))
jobIDs = append(jobIDs, shortID(false, j.Metadata.ID))

// all the middle jobs can have the same timestamp
// but we need the first and last to differ
Expand Down
35 changes: 18 additions & 17 deletions cmd/bacalhau/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,13 @@ func ExecuteJob(ctx context.Context,
// because all code after this point is related to
// "wait for the job to finish" (via WaitAndPrintResultsToUser)
if !runtimeSettings.WaitForJobToFinish {
cmd.Print(j.ID + "\n")
cmd.Print(j.Metadata.ID + "\n")
return nil
}

// if we are in --id-only mode - print the id
if runtimeSettings.PrintJobIDOnly {
cmd.Print(j.ID + "\n")
cmd.Print(j.Metadata.ID + "\n")
}

// if we are only printing the id, set the rest of the output to "quiet",
Expand All @@ -339,15 +339,15 @@ func ExecuteJob(ctx context.Context,
}
}

jobReturn, found, err := apiClient.Get(ctx, j.ID)
jobReturn, found, err := apiClient.Get(ctx, j.Metadata.ID)
if err != nil {
Fatal(cmd, fmt.Sprintf("Error getting job: %s", err), 1)
}
if !found {
Fatal(cmd, fmt.Sprintf("Weird. Just ran the job, but we couldn't find it. Should be impossible. ID: %s", j.ID), 1)
Fatal(cmd, fmt.Sprintf("Weird. Just ran the job, but we couldn't find it. Should be impossible. ID: %s", j.Metadata.ID), 1)
}

js, err := apiClient.GetJobState(ctx, jobReturn.ID)
js, err := apiClient.GetJobState(ctx, jobReturn.Metadata.ID)
if err != nil {
Fatal(cmd, fmt.Sprintf("Error getting job state: %s", err), 1)
}
Expand Down Expand Up @@ -404,7 +404,7 @@ To download the results, execute:
To get more details about the run, execute:
%s%s describe %s
`, indentOne, getCommandLineExecutable(), j.ID, indentOne, getCommandLineExecutable(), j.ID)
`, indentOne, getCommandLineExecutable(), j.Metadata.ID, indentOne, getCommandLineExecutable(), j.Metadata.ID)

// Have to do a final Sprintf so we can inject the resultsCID into the right place
if resultsCID != "" {
Expand All @@ -419,7 +419,7 @@ To get more details about the run, execute:
ctx,
cm,
cmd,
j.ID,
j.Metadata.ID,
downloadSettings,
)
if err != nil {
Expand Down Expand Up @@ -447,7 +447,7 @@ func downloadResultsHandler(
}
}

results, err := GetAPIClient().GetResults(ctx, j.ID)
results, err := GetAPIClient().GetResults(ctx, j.Metadata.ID)
if err != nil {
return err
}
Expand All @@ -456,7 +456,7 @@ func downloadResultsHandler(
return fmt.Errorf("no results found")
}

processedDownloadSettings, err := processDownloadSettings(downloadSettings, j.ID)
processedDownloadSettings, err := processDownloadSettings(downloadSettings, j.Metadata.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -544,15 +544,15 @@ func ReadFromStdinIfAvailable(cmd *cobra.Command, args []string) ([]byte, error)

//nolint:gocyclo,funlen // Better way to do this, Go doesn't have a switch on type
func WaitAndPrintResultsToUser(ctx context.Context, cmd *cobra.Command, j *model.Job, quiet bool) error {
if j == nil || j.ID == "" {
if j == nil || j.Metadata.ID == "" {
return errors.New("No job returned from the server.")
}
getMoreInfoString := fmt.Sprintf(`
To get more information at any time, run:
bacalhau describe %s`, j.ID)
bacalhau describe %s`, j.Metadata.ID)

if !quiet {
cmd.Printf("Job successfully submitted. Job ID: %s\n", j.ID)
cmd.Printf("Job successfully submitted. Job ID: %s\n", j.Metadata.ID)
cmd.Printf("Checking job status... (Enter Ctrl+C to exit at any time, your job will continue running):\n\n")
}

Expand All @@ -567,9 +567,9 @@ To get more information at any time, run:

time.Sleep(1 * time.Second)

jobEvents, err := GetAPIClient().GetEvents(ctx, j.ID)
jobEvents, err := GetAPIClient().GetEvents(ctx, j.Metadata.ID)
if err != nil {
Fatal(cmd, fmt.Sprintf("Failure retrieving job events '%s': %s\n", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Failure retrieving job events '%s': %s\n", j.Metadata.ID, err), 1)
}

// Capture Ctrl+C if the user wants to finish early the job
Expand Down Expand Up @@ -621,9 +621,10 @@ To get more information at any time, run:

if err != nil {
if _, ok := err.(*bacerrors.JobNotFound); ok {
Fatal(cmd, fmt.Sprintf("Somehow even though we submitted a job successfully, we were not able to get its status. ID: %s", j.ID), 1)
Fatal(cmd, fmt.Sprintf(`Somehow even though we submitted a job successfully,
we were not able to get its status. ID: %s`, j.Metadata.ID), 1)
} else {
Fatal(cmd, fmt.Sprintf("Unknown error trying to get job (ID: %s): %+v", j.ID, err), 1)
Fatal(cmd, fmt.Sprintf("Unknown error trying to get job (ID: %s): %+v", j.Metadata.ID, err), 1)
}
}

Expand All @@ -648,7 +649,7 @@ To get more information at any time, run:
signalChan <- syscall.SIGINT
break
} else {
jobEvents, err = GetAPIClient().GetEvents(ctx, j.ID)
jobEvents, err = GetAPIClient().GetEvents(ctx, j.Metadata.ID)
if err != nil {
if _, ok := err.(*bacerrors.ContextCanceledError); ok {
// We're done, the user canceled the job
Expand Down
Loading

0 comments on commit 7a107b3

Please sign in to comment.