diff --git a/client.go b/client.go index c15514b..8ce0d02 100644 --- a/client.go +++ b/client.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "net/url" "strconv" @@ -151,13 +150,13 @@ func (r *Client[P, S]) Enqueue(ctx context.Context, job Job[P, S]) error { case http.StatusAccepted: return nil case http.StatusConflict: - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return ErrJobExits{message: unsafeext.BytesToString(b)} default: if httpext.IsRetryableStatusCode(resp.StatusCode) { return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)} } - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Newf("error: %s", unsafeext.BytesToString(b)) } } @@ -185,13 +184,13 @@ func (r *Client[P, S]) EnqueueBatch(ctx context.Context, jobs []Job[P, S]) error case http.StatusAccepted: return nil case http.StatusConflict: - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return ErrJobExits{message: unsafeext.BytesToString(b)} default: if httpext.IsRetryableStatusCode(resp.StatusCode) { return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)} } - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Newf("error: %s", unsafeext.BytesToString(b)) } } @@ -255,8 +254,8 @@ func (r *Client[P, S]) Next(ctx context.Context, queue string, num_jobs uint32) // Remove removes the Job from the DB for processing. In fact this function makes a call to the complete endpoint. // // NOTE: It does not matter if the Job is in-flight or not it will be removed. All relevant code paths return an -// ErrNotFound to handle such events within Job Workers so that they can bail gracefully if desired. // +// ErrNotFound to handle such events within Job Workers so that they can bail gracefully if desired. func (r *Client[P, S]) Remove(ctx context.Context, queue, jobID string) error { values := make(url.Values) values.Set("job_id", jobID) @@ -280,13 +279,13 @@ func (r *Client[P, S]) Remove(ctx context.Context, queue, jobID string) error { case http.StatusOK: return nil case http.StatusNotFound: - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return ErrNotFound{message: unsafeext.BytesToString(b)} default: if httpext.IsRetryableStatusCode(resp.StatusCode) { return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)} } - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Newf("error: %s", unsafeext.BytesToString(b)) } } @@ -355,7 +354,8 @@ func (j *JobHelper[P, S]) HeartbeatAuto(ctx context.Context, interval time.Durat // Heartbeat calls the Job Runners heartbeat endpoint to keep the job alive. // Optional: It optionally accepts a state payload if desired to be used in case of failure for -// point-in-time restarting. +// +// point-in-time restarting. func (j *JobHelper[P, S]) Heartbeat(ctx context.Context, state *S) error { var err error @@ -393,13 +393,13 @@ func (j *JobHelper[P, S]) Heartbeat(ctx context.Context, state *S) error { case http.StatusAccepted: return nil case http.StatusNotFound: - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return ErrNotFound{message: unsafeext.BytesToString(b)} default: if httpext.IsRetryableStatusCode(resp.StatusCode) { return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)} } - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Newf("error: %s", unsafeext.BytesToString(b)) } } @@ -427,13 +427,13 @@ func (j *JobHelper[P, S]) Reschedule(ctx context.Context, job Job[P, S]) error { case http.StatusAccepted: return nil case http.StatusNotFound: - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return ErrNotFound{message: unsafeext.BytesToString(b)} default: if httpext.IsRetryableStatusCode(resp.StatusCode) { return retryableErr{err: errors.Newf("Temporary error occurred %d", resp.StatusCode)} } - b, _ := ioutil.ReadAll(resp.Body) + b, _ := io.ReadAll(resp.Body) return errors.Newf("error: %s", unsafeext.BytesToString(b)) } } diff --git a/consumer/consumer.go b/consumer/consumer.go index 76c7a5d..db7bbd7 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -202,7 +202,9 @@ func (c *Consumer[P, S, T]) worker(ctx context.Context, ch <-chan *relay.JobHelp } func (c *Consumer[P, S, T]) process(ctx context.Context, helper *relay.JobHelper[P, S]) error { + ctx, cancel := context.WithCancel(ctx) defer func() { + cancel() select { case <-c.sem: default: