Skip to content

Commit

Permalink
handle context cancelation per processing Job
Browse files Browse the repository at this point in the history
  • Loading branch information
deankarn committed Aug 29, 2022
1 parent a5fbcf5 commit 31d7ddb
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
26 changes: 13 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -151,13 +150,13 @@ func (r *Client[P, S]) Enqueue(ctx context.Context, job Job[P, S]) error {
case http.StatusAccepted:
return nil
case http.StatusConflict:
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return ErrJobExits{message: unsafeext.BytesToString(b)}
default:
if httpext.IsRetryableStatusCode(resp.StatusCode) {
return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)}
}
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return errors.Newf("error: %s", unsafeext.BytesToString(b))
}
}
Expand Down Expand Up @@ -185,13 +184,13 @@ func (r *Client[P, S]) EnqueueBatch(ctx context.Context, jobs []Job[P, S]) error
case http.StatusAccepted:
return nil
case http.StatusConflict:
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return ErrJobExits{message: unsafeext.BytesToString(b)}
default:
if httpext.IsRetryableStatusCode(resp.StatusCode) {
return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)}
}
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return errors.Newf("error: %s", unsafeext.BytesToString(b))
}
}
Expand Down Expand Up @@ -255,8 +254,8 @@ func (r *Client[P, S]) Next(ctx context.Context, queue string, num_jobs uint32)
// Remove removes the Job from the DB for processing. In fact this function makes a call to the complete endpoint.
//
// NOTE: It does not matter if the Job is in-flight or not it will be removed. All relevant code paths return an
// ErrNotFound to handle such events within Job Workers so that they can bail gracefully if desired.
//
// ErrNotFound to handle such events within Job Workers so that they can bail gracefully if desired.
func (r *Client[P, S]) Remove(ctx context.Context, queue, jobID string) error {
values := make(url.Values)
values.Set("job_id", jobID)
Expand All @@ -280,13 +279,13 @@ func (r *Client[P, S]) Remove(ctx context.Context, queue, jobID string) error {
case http.StatusOK:
return nil
case http.StatusNotFound:
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return ErrNotFound{message: unsafeext.BytesToString(b)}
default:
if httpext.IsRetryableStatusCode(resp.StatusCode) {
return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)}
}
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return errors.Newf("error: %s", unsafeext.BytesToString(b))
}
}
Expand Down Expand Up @@ -355,7 +354,8 @@ func (j *JobHelper[P, S]) HeartbeatAuto(ctx context.Context, interval time.Durat

// Heartbeat calls the Job Runners heartbeat endpoint to keep the job alive.
// Optional: It optionally accepts a state payload if desired to be used in case of failure for
// point-in-time restarting.
//
// point-in-time restarting.
func (j *JobHelper[P, S]) Heartbeat(ctx context.Context, state *S) error {

var err error
Expand Down Expand Up @@ -393,13 +393,13 @@ func (j *JobHelper[P, S]) Heartbeat(ctx context.Context, state *S) error {
case http.StatusAccepted:
return nil
case http.StatusNotFound:
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return ErrNotFound{message: unsafeext.BytesToString(b)}
default:
if httpext.IsRetryableStatusCode(resp.StatusCode) {
return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)}
}
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return errors.Newf("error: %s", unsafeext.BytesToString(b))
}
}
Expand Down Expand Up @@ -427,13 +427,13 @@ func (j *JobHelper[P, S]) Reschedule(ctx context.Context, job Job[P, S]) error {
case http.StatusAccepted:
return nil
case http.StatusNotFound:
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return ErrNotFound{message: unsafeext.BytesToString(b)}
default:
if httpext.IsRetryableStatusCode(resp.StatusCode) {
return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)}
}
b, _ := ioutil.ReadAll(resp.Body)
b, _ := io.ReadAll(resp.Body)
return errors.Newf("error: %s", unsafeext.BytesToString(b))
}
}
Expand Down
2 changes: 2 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func (c *Consumer[P, S, T]) worker(ctx context.Context, ch <-chan *relay.JobHelp
}

func (c *Consumer[P, S, T]) process(ctx context.Context, helper *relay.JobHelper[P, S]) error {
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
select {
case <-c.sem:
default:
Expand Down

0 comments on commit 31d7ddb

Please sign in to comment.