Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move job serialisation to driver #22

Merged
merged 4 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ let package = Package(
.library(name: "JobsRedis", targets: ["JobsRedis"])
],
dependencies: [
// TODO: use a released version
.package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"),
.package(url: "https://github.com/swift-server/RediStack.git", from: "1.6.2"),
],
Expand Down
85 changes: 50 additions & 35 deletions Sources/JobsRedis/RedisJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
self.redisConnectionPool = .init(redisConnectionPool)
self.configuration = configuration
self.isStopped = .init(false)
self.jobRegistry = .init()
}

/// Cleanup job queues
Expand Down Expand Up @@ -149,58 +150,65 @@
try await self.initQueue(queueKey: self.configuration.failedQueueKey, onInit: failedJobs)
}

/// Register job
/// - Parameters:
/// - job: Job Definition
public func registerJob<Parameters: Codable & Sendable>(_ job: JobDefinition<Parameters>) {
self.jobRegistry.registerJob(job)
}

/// Push job data onto queue
/// - Parameters:
/// - buffer: Encoded Job data
/// - options: Job options
/// - Returns: Job ID
@discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID {
@discardableResult public func push<Parameters>(_ jobRequest: JobRequest<Parameters>, options: JobOptions) async throws -> JobID {
let jobInstanceID = JobID()
try await self.push(jobID: jobInstanceID, buffer: buffer, options: options)
try await self.push(jobID: jobInstanceID, jobRequest: jobRequest, options: options)
return jobInstanceID
}

/// Helper for enqueuing jobs
private func push(jobID: JobID, buffer: ByteBuffer, options: JobOptions) async throws {
let pendingJobID = PendingJobID(jobID: jobID, delayUntil: options.delayUntil)
try await self.addToQueue(pendingJobID, buffer: buffer)
}

/// Retry job data onto queue
/// - Parameters:
/// - id: JobID
/// - buffer: Encoded Job data
/// - jobRequest: Job request
/// - options: JobOptions
/// - Returns: Bool
@discardableResult public func retry(_ id: JobID, buffer: NIOCore.ByteBuffer, options: Jobs.JobOptions) async throws -> Bool {
try await self.finished(jobId: id)
try await self.push(jobID: id, buffer: buffer, options: options)
return true
public func retry<Parameters>(_ id: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
try await self.finished(jobID: id)
try await self.push(jobID: id, jobRequest: jobRequest, options: options)
}

/// Helper for enqueuing jobs
private func push<Parameters>(jobID: JobID, jobRequest: JobRequest<Parameters>, options: JobOptions) async throws {
let buffer = try self.jobRegistry.encode(jobRequest: jobRequest)
let pendingJobID = PendingJobID(jobID: jobID, delayUntil: options.delayUntil)
try await self.addToQueue(pendingJobID, buffer: buffer)
}

private func addToQueue(_ pendingJobID: PendingJobID, buffer: ByteBuffer) async throws {
try await self.set(jobId: pendingJobID.jobID, buffer: buffer)
try await self.set(jobID: pendingJobID.jobID, buffer: buffer)
_ = try await self.redisConnectionPool.wrappedValue.lpush(pendingJobID, into: self.configuration.queueKey).get()
}

/// Flag job is done
///
/// Removes job id from processing queue
/// - Parameters:
/// - jobId: Job id
public func finished(jobId: JobID) async throws {
_ = try await self.redisConnectionPool.wrappedValue.lrem(jobId.description, from: self.configuration.processingQueueKey, count: 0).get()
try await self.delete(jobId: jobId)
/// - jobID: Job id
public func finished(jobID: JobID) async throws {
_ = try await self.redisConnectionPool.wrappedValue.lrem(jobID.description, from: self.configuration.processingQueueKey, count: 0).get()
try await self.delete(jobID: jobID)
}

/// Flag job failed to process
///
/// Removes job id from processing queue, adds to failed queue
/// - Parameters:
/// - jobId: Job id
public func failed(jobId: JobID, error: Error) async throws {
_ = try await self.redisConnectionPool.wrappedValue.lrem(jobId.redisKey, from: self.configuration.processingQueueKey, count: 0).get()
_ = try await self.redisConnectionPool.wrappedValue.lpush(jobId.redisKey, into: self.configuration.failedQueueKey).get()
/// - jobID: Job id
public func failed(jobID: JobID, error: Error) async throws {
_ = try await self.redisConnectionPool.wrappedValue.lrem(jobID.redisKey, from: self.configuration.processingQueueKey, count: 0).get()
_ = try await self.redisConnectionPool.wrappedValue.lpush(jobID.redisKey, into: self.configuration.failedQueueKey).get()
}

public func stop() async {
Expand All @@ -227,7 +235,7 @@
/// Pop Job off queue and add to pending queue
/// - Parameter eventLoop: eventLoop to do work on
/// - Returns: queued job
func popFirst() async throws -> QueuedJob<JobID>? {
func popFirst() async throws -> JobQueueResult<JobID>? {
let pool = self.redisConnectionPool.wrappedValue
let pendingJobKey = try await pool.rpop(from: self.configuration.queueKey).get()
guard !pendingJobKey.isNull else {
Expand All @@ -246,10 +254,15 @@

_ = try await pool.lpush(jobID.redisKey, into: self.configuration.processingQueueKey).get()

if let buffer = try await self.get(jobId: jobID) {
return .init(id: jobID, jobBuffer: buffer)
if let buffer = try await self.get(jobID: jobID) {
do {
let jobInstance = try self.jobRegistry.decode(buffer)
return .init(id: jobID, result: .success(jobInstance))
} catch let error as JobQueueError {
return .init(id: jobID, result: .failure(error))
}
} else {
throw RedisQueueError.jobMissing(jobID)
return .init(id: jobID, result: .failure(JobQueueError(code: .unrecognisedJobId, jobName: nil)))

Check warning on line 265 in Sources/JobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsRedis/RedisJobQueue.swift#L265

Added line #L265 was not covered by tests
}
}

Expand Down Expand Up @@ -277,7 +290,7 @@
guard let key = PendingJobID(fromRESP: key) else {
throw RedisQueueError.unexpectedRedisKeyType
}
try await self.delete(jobId: key.jobID)
try await self.delete(jobID: key.jobID)

Check warning on line 293 in Sources/JobsRedis/RedisJobQueue.swift

View check run for this annotation

Codecov / codecov/patch

Sources/JobsRedis/RedisJobQueue.swift#L293

Added line #L293 was not covered by tests
}
default:
break
Expand Down Expand Up @@ -305,26 +318,28 @@
throw RedisQueueError.unexpectedRedisKeyType
}
let identifier = JobID(value: key)
try await self.delete(jobId: identifier)
try await self.delete(jobID: identifier)
}
}

func get(jobId: JobID) async throws -> ByteBuffer? {
try await self.redisConnectionPool.wrappedValue.get(jobId.redisKey).get().byteBuffer
func get(jobID: JobID) async throws -> ByteBuffer? {
try await self.redisConnectionPool.wrappedValue.get(jobID.redisKey).get().byteBuffer
}

func set(jobId: JobID, buffer: ByteBuffer) async throws {
try await self.redisConnectionPool.wrappedValue.set(jobId.redisKey, to: buffer).get()
func set(jobID: JobID, buffer: ByteBuffer) async throws {
try await self.redisConnectionPool.wrappedValue.set(jobID.redisKey, to: buffer).get()
}

func delete(jobId: JobID) async throws {
_ = try await self.redisConnectionPool.wrappedValue.delete(jobId.redisKey).get()
func delete(jobID: JobID) async throws {
_ = try await self.redisConnectionPool.wrappedValue.delete(jobID.redisKey).get()
}

let jobRegistry: JobRegistry
}

/// extend RedisJobQueue to conform to AsyncSequence
extension RedisJobQueue {
public typealias Element = QueuedJob<JobID>
public typealias Element = JobQueueResult<JobID>
public struct AsyncIterator: AsyncIteratorProtocol {
let queue: RedisJobQueue

Expand Down
Loading