Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(delayed jobs): adding delayed jobs implementation #5

Merged
merged 13 commits into from
Sep 13, 2024

Conversation

thoven87
Copy link
Contributor

@thoven87 thoven87 commented Sep 6, 2024

First pass at implementing delayed Jobs

Copy link

codecov bot commented Sep 6, 2024

Codecov Report

Attention: Patch coverage is 96.00000% with 1 line in your changes missing coverage. Please review.

Please upload report for BASE (main@db8b905). Learn more about missing BASE report.

Files with missing lines Patch % Lines
Sources/JobsRedis/RedisJobQueue.swift 96.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main       #5   +/-   ##
=======================================
  Coverage        ?   89.68%           
=======================================
  Files           ?        3           
  Lines           ?      126           
  Branches        ?        0           
=======================================
  Hits            ?      113           
  Misses          ?       13           
  Partials        ?        0           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Package.swift Outdated Show resolved Hide resolved
Copy link
Member

@adam-fowler adam-fowler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let me get this right

  • "delayedZSet::JobId" stores the job buffer and a score (basically the delay in seconds)
  • "delayedQueue" stores a queue of delayed job ids
    Does this mean we have a zset for each queued job that is delayed, with one element in it? It appears you pull a job off the delayedQueue, then check if its associated sorted set score returns a value, if it doesn't you push it back onto the delayedQueue.

You could implement something similar by pushing jobID and delay into the standard queue, and when you pop off the standard queue compare the date with Date.now and if it is not ready push back on standard queue. This would remove the need for the additional sorted key and delayed queue.

Alternatively we could just have one sorted key which stores all the delayed job ids with a score based off the delay. Then each 10 or so ticks (this is not something we need to do on every call to popFirst) we can get the jobs off this queue whose score indicates they are no longer delayed and add them to the job queue.

The only problem with this is the read from sorted list and remove needs to be atomic. I can't find a redis command that does this. Maybe we could write a script to do that.

@thoven87
Copy link
Contributor Author

thoven87 commented Sep 7, 2024

So let me get this right

  • "delayedZSet::JobId" stores the job buffer and a score (basically the delay in seconds)
  • "delayedQueue" stores a queue of delayed job ids
    Does this mean we have a zset for each queued job that is delayed, with one element in it? It appears you pull a job off the delayedQueue, then check if its associated sorted set score returns a value, if it doesn't you push it back onto the delayedQueue.

That's correct Adam! I could use delayedZSet which stores all job buffer and then move jobs out as needed. this could guarantee no data loss as no jobs are removed until pool.zremrangebyscore is called. We will not have access to the job id anymore.

You could implement something similar by pushing jobID and delay into the standard queue, and when you pop off the standard queue compare the date with Date.now and if it is not ready push back on standard queue. This would remove the need for the additional sorted key and delayed queue.

I think using the sorted key here is beneficial because it affords us to implement priority queue with less work.

Alternatively we could just have one sorted key which stores all the delayed job ids with a score based off the delay. Then each 10 or so ticks (this is not something we need to do on every call to popFirst) we can get the jobs off this queue whose score indicates they are no longer delayed and add them to the job queue.

The only problem with this is the read from sorted list and remove needs to be atomic. I can't find a redis command that does this. Maybe we could write a script to do that.

Indeed, we could write a Lua script to do just that. I think a raised concerns about implementing such logic outside of swift.

I was looking to see if there's a watch command for RedisStack. Maybe this could be used?

@thoven87
Copy link
Contributor Author

thoven87 commented Sep 7, 2024

You could implement something similar by pushing jobID and delay into the standard queue, and when you pop off the standard queue compare the date with Date.now and if it is not ready push back on standard queue. This would remove the need for the additional sorted key and delayed queue.

This would require to have the delayUtil property set on the job instance. Moreover, the redis implementation would need to decode check date then encode if not read and push back no?

Could you please share what you had in mind?

@adam-fowler
Copy link
Member

adam-fowler commented Sep 7, 2024

This would require to have the delayUtil property set on the job instance. Moreover, the redis implementation would need to decode check date then encode if not read and push back no?

As with the Postgres and memory implementations the delayedUntil parameter only has to be stored in the queue. The queue currently only stores a job id. Extending it to include the delayedUntil value wouldn't be hard.

@thoven87
Copy link
Contributor Author

thoven87 commented Sep 7, 2024

This would require to have the delayUtil property set on the job instance. Moreover, the redis implementation would need to decode check date then encode if not read and push back no?

As with the Postgres and memory implementations the delayedUntil parameter only has to be stored in the queue. The queue currently only stores a job id. Extending it to include the delayedUntil value wouldn't be hard.

Hmm, something like this queue -> ["jobID::delayUntilValue"]?

@adam-fowler
Copy link
Member

The queue entries should be of the form

{"id":String,"delay":Int?}

Where id is the job id and delay is the delay until value as seconds from 1970.

@thoven87
Copy link
Contributor Author

thoven87 commented Sep 7, 2024

The queue entries should be of the form

{"id":String,"delay":Int?}

Where id is the job id and delay is the delay until value as seconds from 1970.

I switched to using the delay job queue. I think we are all set since this version will handle migration just fine.

@thoven87
Copy link
Contributor Author

thoven87 commented Sep 9, 2024

What are the next steps to get this PR merged? @adam-fowler

…ing queue, use the current job key for item to be deleted from the processing queue
@adam-fowler
Copy link
Member

Let me have a look at it tomorrow. Sorry I haven't got to this. I've been quite busy

Copy link
Member

@adam-fowler adam-fowler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looks goods, I've a couple of comments

Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Show resolved Hide resolved
Copy link
Member

@adam-fowler adam-fowler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delay still has to be recorded as an Int. You could also use the reference date (2000 I think) instead of 1970. You could probably hide all of this inside of JobID instead of spreading the conversions across different points in the code.

@thoven87
Copy link
Contributor Author

Delay still has to be recorded as an Int. You could also use the reference date (2000 I think) instead of 1970. You could probably hide all of this inside of JobID instead of spreading the conversions across different points in the code.

I will hide the logic inside of JobID instead.

Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Sources/JobsRedis/RedisJobQueue.swift Outdated Show resolved Hide resolved
Copy link
Member

@adam-fowler adam-fowler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made one change to get this over the line. I removed the getDelayed function as it isn't used.

@thoven87 thoven87 merged commit 0470843 into hummingbird-project:main Sep 13, 2024
6 of 7 checks passed
@thoven87 thoven87 deleted the stevenson/delayed-jobs branch September 13, 2024 10:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants