Skip to content

Commit

Permalink
Fix PipelineTests
Browse files Browse the repository at this point in the history
  • Loading branch information
finestructure committed Sep 30, 2024
1 parent 5e1ce7d commit a6e1fa6
Showing 1 changed file with 68 additions and 30 deletions.
98 changes: 68 additions & 30 deletions Tests/AppTests/PipelineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import XCTest

@testable import App

import Dependencies
import SQLKit
import Vapor
import XCTest


// Tests concerning the full pipeline of operations:
Expand All @@ -31,21 +33,29 @@ class PipelineTests: AppTestCase {
Package(url: "1", status: .ok, processingStage: .reconciliation),
Package(url: "2", status: .ok, processingStage: .reconciliation),
].save(on: app.db)
// fast forward our clock by the deadtime interval
Current.date = { Date().addingTimeInterval(Constants.reIngestionDeadtime) }
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["1", "2"])

try await withDependencies {
// fast forward our clock by the deadtime interval
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
} operation: {
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["1", "2"])
}
}

func test_fetchCandidates_ingestion_limit() async throws {
try await [
Package(url: "1", status: .ok, processingStage: .reconciliation),
Package(url: "2", status: .ok, processingStage: .reconciliation),
].save(on: app.db)
// fast forward our clock by the deadtime interval
Current.date = { Date().addingTimeInterval(Constants.reIngestionDeadtime) }
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 1)
XCTAssertEqual(batch.map(\.model.url), ["1"])

try await withDependencies {
// fast forward our clock by the deadtime interval
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
} operation: {
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 1)
XCTAssertEqual(batch.map(\.model.url), ["1"])
}
}

func test_fetchCandidates_ingestion_correct_stage() async throws {
Expand All @@ -55,8 +65,13 @@ class PipelineTests: AppTestCase {
Package(url: "2", status: .ok, processingStage: .reconciliation),
Package(url: "3", status: .ok, processingStage: .analysis),
].save(on: app.db)
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["2"])

try await withDependencies {
$0.date.now = .now
} operation: {
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["2"])
}
}

func test_fetchCandidates_ingestion_prefer_new() async throws {
Expand All @@ -66,10 +81,14 @@ class PipelineTests: AppTestCase {
Package(url: "2", status: .new, processingStage: .reconciliation),
Package(url: "3", status: .ok, processingStage: .reconciliation),
].save(on: app.db)
// fast forward our clock by the deadtime interval
Current.date = { Date().addingTimeInterval(Constants.reIngestionDeadtime) }
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["2", "1", "3"])

try await withDependencies {
// fast forward our clock by the deadtime interval
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
} operation: {
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["2", "1", "3"])
}
}

func test_fetchCandidates_ingestion_eventual_refresh() async throws {
Expand All @@ -83,8 +102,13 @@ class PipelineTests: AppTestCase {
try await (app.db as! SQLDatabase).raw(
"update packages set updated_at = updated_at - interval '91 mins' where id = \(bind: p2.id)"
).run()
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["2"])

try await withDependencies {
$0.date.now = .now
} operation: {
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["2"])
}
}

func test_fetchCandidates_ingestion_refresh_analysis_only() async throws {
Expand All @@ -99,10 +123,14 @@ class PipelineTests: AppTestCase {
Package(url: "2", status: .new, processingStage: .ingestion),
Package(url: "3", status: .new, processingStage: .analysis),
].save(on: app.db)
// fast forward our clock by the deadtime interval
Current.date = { Date().addingTimeInterval(Constants.reIngestionDeadtime) }
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["1", "3"])

try await withDependencies {
// fast forward our clock by the deadtime interval
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
} operation: {
let batch = try await Package.fetchCandidates(app.db, for: .ingestion, limit: 10)
XCTAssertEqual(batch.map(\.model.url), ["1", "3"])
}
}

func test_fetchCandidates_analysis_correct_stage() async throws {
Expand Down Expand Up @@ -167,8 +195,12 @@ class PipelineTests: AppTestCase {
XCTAssertEqual(packages.map(\.isNew), [true, true, true])
}

// MUT - second stage
try await ingest(client: app.client, database: app.db, mode: .limit(10))
try await withDependencies {
$0.date.now = .now
} operation: {
// MUT - second stage
try await ingest(client: app.client, database: app.db, mode: .limit(10))
}

do { // validate
let packages = try await Package.query(on: app.db).sort(\.$url).all()
Expand Down Expand Up @@ -205,8 +237,12 @@ class PipelineTests: AppTestCase {
XCTAssertEqual(packages.map(\.isNew), [false, false, true])
}

// MUT - ingest again
try await ingest(client: app.client, database: app.db, mode: .limit(10))
try await withDependencies {
$0.date.now = .now
} operation: {
// MUT - ingest again
try await ingest(client: app.client, database: app.db, mode: .limit(10))
}

do { // validate - only new package moves to .ingestion stage
let packages = try await Package.query(on: app.db).sort(\.$url).all()
Expand All @@ -231,11 +267,13 @@ class PipelineTests: AppTestCase {
XCTAssertEqual(packages.map(\.isNew), [false, false, false])
}

// fast forward our clock by the deadtime interval
Current.date = { Date().addingTimeInterval(Constants.reIngestionDeadtime) }

// MUT - ingest yet again
try await ingest(client: app.client, database: app.db, mode: .limit(10))
try await withDependencies {
// fast forward our clock by the deadtime interval
$0.date.now = .now.addingTimeInterval(Constants.reIngestionDeadtime)
} operation: {
// MUT - ingest yet again
try await ingest(client: app.client, database: app.db, mode: .limit(10))
}

do { // validate - now all three packages should have been updated
let packages = try await Package.query(on: app.db).sort(\.$url).all()
Expand Down

0 comments on commit a6e1fa6

Please sign in to comment.