Skip to content

Commit

Permalink
Update for Queues 1.0.0 and custom JobIdentifier
Browse files Browse the repository at this point in the history
  • Loading branch information
m-barthelemy committed Apr 17, 2020
1 parent a627a33 commit 8f2ecda
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 59 deletions.
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ let package = Package(
targets: ["QueuesFluentDriver"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.1"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.0.0-rc.2"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.0.0-rc"),
.package(url: "https://github.com/vapor/queues.git", from: "1.0.0-rc"),
.package(url: "https://github.com/vapor/queues.git", from: "1.0.0"),
],
targets: [
.target(
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ let package = Package(
...
dependencies: [
...
.package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "0.2.4"),
.package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "0.3"),
...
],
targets: [
Expand Down
31 changes: 11 additions & 20 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ extension FluentQueue: Queue {
guard let database = db else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
guard let uuid = UUID(uuidString: id.string) else {
return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier)
}
return database.query(JobModel.self)
.filter(\.$id == uuid)
.filter(\.$jobId == id.string)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMapThrowing { job in
Expand All @@ -32,24 +29,19 @@ extension FluentQueue: Queue {
guard let database = db else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
guard let uuid = UUID(uuidString: id.string) else {
return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier)
}
//let data = try! JSONEncoder().encode(jobStorage)
return JobModel(id: uuid, key: key, data: jobStorage).save(on: database)
//.map { return }
return JobModel(jobId: id.string, queue: queueName.string, data: jobStorage)
.save(on: database)
}

func clear(_ id: JobIdentifier) -> EventLoopFuture<Void> {
guard let database = db else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
guard let uuid = UUID(uuidString: id.string) else {
return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier)
}
// This does the equivalent of a Fluent Softdelete but sets the `state` to `completed`
return database.query(JobModel.self)
.filter(\.$id == uuid)
.filter(\.$jobId == id.string)
.filter(\.$state != QueuesFluentJobState.completed)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMap { job in
Expand All @@ -67,17 +59,15 @@ extension FluentQueue: Queue {
guard let database = db else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
guard let uuid = UUID(uuidString: id.string) else {
return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier)
}
let sqlDb = database as! SQLDatabase
return sqlDb
.update(JobModel.schema)
.set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending))
.where (SQLColumn("\(FieldKey.id)"), .equal, SQLBind(uuid))
.where (SQLColumn("\(FieldKey.jobId)"), .equal, SQLBind(id.string))
.run()
}

/// Currently selects the oldest job pending execution
func pop() -> EventLoopFuture<JobIdentifier?> {
guard let database = db else {
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
Expand All @@ -86,9 +76,10 @@ extension FluentQueue: Queue {

var selectQuery = db
.select ()
.column ("\(FieldKey.id)")
.column ("\(FieldKey.jobId)")
.from (JobModel.schema)
.where ("\(FieldKey.state)", .equal, SQLBind(QueuesFluentJobState.pending))
.where ("\(FieldKey.queue)", .equal, SQLBind(self.queueName.string))
.orderBy("\(FieldKey.createdAt)")
.limit (1)
if self.dbType != .sqlite {
Expand All @@ -105,7 +96,7 @@ extension FluentQueue: Queue {
popProvider = SqlitePop()
}
return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in
return JobIdentifier(string: id.uuidString)
return JobIdentifier(string: id)
}
}

Expand All @@ -120,7 +111,7 @@ extension FluentQueue: Queue {
.from (JobModel.schema)
.where ("\(FieldKey.state)", .equal, SQLBind(state))
if let queue = queue {
query = query.where("\(FieldKey.key)", .equal, SQLBind(queue))
query = query.where("\(FieldKey.queue)", .equal, SQLBind(queue))
}
if self.dbType != .sqlite {
query = query.lockingClause(SQLSkipLocked.forShareSkipLocked)
Expand Down
23 changes: 12 additions & 11 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import Fluent
import Queues

public enum QueuesFluentJobState: String, Codable, CaseIterable {
/// Ready to be oicked up for execution
/// Ready to be picked up for execution
case pending
case processing
/// Executed, regardless if it was successful or not
case completed
}

extension FieldKey {
static var key: Self { "key" }
static var jobId: Self { "job_id" }
static var queue: Self { "queue" }
static var data: Self { "data" }
static var state: Self { "state" }

Expand All @@ -23,16 +24,18 @@ extension FieldKey {
class JobModel: Model {
public required init() {}

/// Properties
public static var schema = "jobs"
public static var schema = "_jobs"

/// The unique Job uuid
@ID(key: .id)
var id: UUID?

@Field(key: .jobId)
var jobId: String?

/// The Job key
@Field(key: .key)
var key: String
@Field(key: .queue)
var queue: String

/// The Job data
@Field(key: .data)
Expand All @@ -43,21 +46,19 @@ class JobModel: Model {
@Field(key: .state)
var state: QueuesFluentJobState

/// The created timestamp
@Timestamp(key: .createdAt, on: .create)
var createdAt: Date?

/// The updated timestamp
@Timestamp(key: .updatedAt, on: .update)
var updatedAt: Date?

@Timestamp(key: .deletedAt, on: .delete)
var deletedAt: Date?


init(id: UUID, key: String, data: JobData) {
self.id = id
self.key = key
init(jobId: String, queue: String, data: JobData) {
self.jobId = jobId
self.queue = queue
self.data = try! JSONEncoder().encode(data)
self.state = .pending
}
Expand Down
20 changes: 17 additions & 3 deletions Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,34 @@ public struct JobModelMigrate: Migration {
public func prepare(on database: Database) -> EventLoopFuture<Void> {
return database.schema(JobModel.schema)
.id()
.field(FieldKey.key, .string, .required)
.field(FieldKey.jobId, .string, .required)
.field(FieldKey.queue, .string, .required)
.field(FieldKey.data, .data, .required)
.field(FieldKey.state, .string, .required)
.field(FieldKey.createdAt, .datetime)
.field(FieldKey.updatedAt, .datetime)
.field(FieldKey.deletedAt, .datetime)
.create()
.flatMap {
// Mysql could lock the entire table if there's no index on the field of the WHERE clause
// Mysql could lock the entire table if there's no index on the fields of the WHERE clause.
// Since we have both state and queue in the WHERE clause to retrieve pending jobs,
// both need to be part of a composite index.
// Order of the fields in the composite index and order of the fields in the WHERE clauses should match.
// Or I got totally confused reading their doc, which is also a possibility.
// Postgres seems to not be so sensitive and should be happy with the following indices.
let sqlDb = database as! SQLDatabase
return sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)")
let stateIndex = sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)_\(FieldKey.queue)")
.on(JobModel.schema)
.column("\(FieldKey.state)")
.column("\(FieldKey.queue)")
.run()
let jobIdIndex = sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.jobId)")
.on(JobModel.schema)
.column("\(FieldKey.jobId)")
.run()
return stateIndex.and(jobIdIndex).map { indices in
return
}
}
}

Expand Down
20 changes: 12 additions & 8 deletions Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@ import Fluent
import Queues

final class MySQLPop : PopQueryProtocol {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<UUID?> {
// MySQL is a bit challenging since it doesn't support updating a table that is
// used in a subquery.
// So we first select, then update, with the whole process wrapped in a transaction.
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<String?> {
db.transaction { transaction in
let database = transaction as! SQLDatabase
var id: UUID?
var id: String?

return database.execute(sql: select) { (row) -> Void in
id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self)
id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self)
}
.flatMap {
if let id = id {
let updateQuery = database
.update(JobModel.schema)
.set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id))
.update (JobModel.schema)
.set (SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set (SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date()))
.where (SQLColumn.init("\(FieldKey.jobId)"), .equal, SQLBind.init(id))
.where (SQLColumn.init("\(FieldKey.state)"), .equal, SQLBind.init(QueuesFluentJobState.pending))
.query
return database.execute(sql: updateQuery) { (row) in }
.map { id }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import SQLKit
import Fluent

protocol PopQueryProtocol {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<UUID?>
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<String?>
}
10 changes: 5 additions & 5 deletions Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ import Fluent
import Queues

final class PostgresPop : PopQueryProtocol {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<UUID?> {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<String?> {
let database = db as! SQLDatabase
let subQueryGroup = SQLGroupExpression.init(select)
let query = database
.update (JobModel.schema)
.set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing))
.set (SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date()))
.where (
SQLBinaryExpression(left: SQLColumn("\(FieldKey.id)"), op: SQLBinaryOperator.equal , right: subQueryGroup)
SQLBinaryExpression(left: SQLColumn("\(FieldKey.jobId)"), op: SQLBinaryOperator.equal , right: subQueryGroup)
)
// Gross abuse
.orWhere(SQLReturning.returning(column: FieldKey.id))
.orWhere(SQLReturning.returning(column: FieldKey.jobId))
.query

var id: UUID?
var id: String?
return database.execute(sql: query) { (row) -> Void in
id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self)
id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self)
}.map {
return id
}
Expand Down
15 changes: 8 additions & 7 deletions Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ import Fluent
import Queues

final class SqlitePop : PopQueryProtocol {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<UUID?> {
func pop(db: Database, select: SQLExpression) -> EventLoopFuture<String?> {
let database = db as! SQLDatabase
//let beginImmediateTrxn = database.raw("BEGIN IMMEDIATE").

//return database.raw(SQLQueryString("BEGIN IMMEDIATE")).run().flatMap { void in
var id: UUID?
var id: String?
return database.execute(sql: select) { (row) -> Void in
id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self)
id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self)
}
.flatMap {
if let id = id {
let updateQuery = database
.update(JobModel.schema)
.set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id))
.update (JobModel.schema)
.set (SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set (SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date()))
.where (SQLColumn.init("\(FieldKey.jobId)"), .equal, SQLBind.init(id))
.where (SQLColumn.init("\(FieldKey.state)"), .equal, SQLBind.init(QueuesFluentJobState.pending))
.query
return database.execute(sql: updateQuery) { (row) in }
.flatMap {
Expand Down
4 changes: 3 additions & 1 deletion Sources/QueuesFluentDriver/SQLExpressionExtensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ enum SQLSkipLocked: SQLExpression {
case .forUpdateSkipLocked:
serializer.write("FOR UPDATE SKIP LOCKED")
case .forShareSkipLocked:
// This is the "lightest" locking that is supported by both Postgres and Mysql
serializer.write("FOR SHARE SKIP LOCKED")
}
}
Expand All @@ -26,7 +27,8 @@ enum SQLReturning: SQLExpression {
case .returningAll:
serializer.write("1=2 RETURNING *")
case .returning(let column):
serializer.write("1=2 RETURNING \"\(column.description)\"")
serializer.write("1=2 RETURNING ")
SQLColumn(column.description).serialize(to: &serializer)
}
}
}

0 comments on commit 8f2ecda

Please sign in to comment.