Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revisit /submit-processed-data #333

Merged
merged 3 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defaults:
run:
working-directory: ./backend

concurrency:
group: ci-${{ github.ref }}-backend
cancel-in-progress: true

jobs:
Tests:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ name: e2e
on:
push:

concurrency:
group: ci-${{ github.ref }}-e2e
cancel-in-progress: true

jobs:
E2ETests:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/website.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defaults:
run:
working-directory: ./website

concurrency:
group: ci-${{ github.ref }}-website
cancel-in-progress: true

jobs:
checks:
name: Check format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ class ExceptionHandler : ResponseEntityExceptionHandler() {
)
}

@ExceptionHandler(ConstraintViolationException::class)
@ExceptionHandler(ConstraintViolationException::class, BadRequestException::class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
fun handleBadRequestException(e: ConstraintViolationException): ResponseEntity<ProblemDetail> {
log.warn(e) { "Caught ConstraintViolationException: ${e.message}" }
fun handleBadRequestException(e: Exception): ResponseEntity<ProblemDetail> {
log.warn(e) { "Caught ${e.javaClass}: ${e.message}" }

return responseEntity(
HttpStatus.BAD_REQUEST,
Expand Down Expand Up @@ -80,5 +80,6 @@ class ExceptionHandler : ResponseEntityExceptionHandler() {
}
}

class UnprocessableEntityException(message: String) : RuntimeException(message)
class BadRequestException(message: String, override val cause: Throwable? = null) : RuntimeException(message)
class ForbiddenException(message: String) : RuntimeException(message)
class UnprocessableEntityException(message: String) : RuntimeException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.pathoplexus.backend.controller
import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.Parameter
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.media.ExampleObject
import io.swagger.v3.oas.annotations.media.Schema
import io.swagger.v3.oas.annotations.responses.ApiResponse
import jakarta.servlet.http.HttpServletRequest
Expand All @@ -15,10 +14,10 @@ import org.pathoplexus.backend.model.SubmitModel
import org.pathoplexus.backend.service.DatabaseService
import org.pathoplexus.backend.service.FileData
import org.pathoplexus.backend.service.OriginalData
import org.pathoplexus.backend.service.SequenceVersion
import org.pathoplexus.backend.service.SequenceValidation
import org.pathoplexus.backend.service.SequenceVersionStatus
import org.pathoplexus.backend.service.SubmittedProcessedData
import org.pathoplexus.backend.service.UnprocessedData
import org.pathoplexus.backend.service.ValidationResult
import org.pathoplexus.backend.utils.FastaReader
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
Expand Down Expand Up @@ -65,6 +64,18 @@ private const val SUBMIT_REVIEWED_SEQUENCE_DESCRIPTION =

private const val MAX_EXTRACTED_SEQUENCES = 100_000L

private const val SUBMIT_PROCESSED_DATA_DESCRIPTION = """
Submit processed data as a stream of NDJSON. The schema is to be understood per line of the NDJSON stream.
This endpoint performs some server side validation and returns the validation result for every submitted sequence.
Any server side validation errors will be appended to the 'errors' field of the sequence.
On a technical error, this endpoint will roll back all previously inserted data.
"""
private const val SUBMIT_PROCESSED_DATA_RESPONSE_DESCRIPTION = "Contains an entry for every submitted sequence."
private const val SUBMIT_PROCESSED_DATA_ERROR_RESPONSE_DESCRIPTION = """
On sequence version that cannot be written to the database, e.g. if the sequence id does not exist.
Rolls back the whole transaction.
"""

@RestController
@Validated
class SubmissionController(
Expand Down Expand Up @@ -115,34 +126,23 @@ class SubmissionController(
}

@Operation(
description = "Submit processed data as a stream of NDJSON",
description = SUBMIT_PROCESSED_DATA_DESCRIPTION,
requestBody = SwaggerRequestBody(
content = [
Content(
mediaType = MediaType.APPLICATION_NDJSON_VALUE,
schema = Schema(implementation = SequenceVersion::class),
examples = [
ExampleObject(
name = "Example for submitting processed sequences. \n" +
" NOTE: Due to formatting issues with swagger, remove all newlines from the example.",
value = """{"sequenceId":"4","version":"1",data":{"date":"2020-12-25","host":"Homo sapiens","region":"Europe","country":"Switzerland","division":"Schaffhausen", "nucleotideSequences":{"main":"NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNAGATC..."}}}""", // ktlint-disable max-line-length
summary = "Processed data (remove all newlines from the example)",
),
ExampleObject(
name = "Example for submitting processed sequences with errors. \n" +
" NOTE: Due to formatting issues with swagger, remove all newlines from the example.",
value = """{"sequenceId":"4","version":"1","data":{"errors":[{"field":"host",message:"Not that kind of host"}],"date":"2020-12-25","host":"google.com","region":"Europe","country":"Switzerland","division":"Schaffhausen", "nucleotideSequences":{"main":"NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNAGATC..."}}}""", // ktlint-disable max-line-length
summary = "Processed data with errors (remove all newlines from the example)",
),
],
schema = Schema(implementation = SubmittedProcessedData::class),
),
],
),
)
@ApiResponse(responseCode = "200", description = SUBMIT_PROCESSED_DATA_RESPONSE_DESCRIPTION)
@ApiResponse(responseCode = "400", description = "On invalid NDJSON line. Rolls back the whole transaction.")
@ApiResponse(responseCode = "422", description = SUBMIT_PROCESSED_DATA_ERROR_RESPONSE_DESCRIPTION)
@PostMapping("/submit-processed-data", consumes = [MediaType.APPLICATION_NDJSON_VALUE])
fun submitProcessedData(
request: HttpServletRequest,
): List<ValidationResult> {
): List<SequenceValidation> {
return databaseService.updateProcessedData(request.inputStream)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.pathoplexus.backend.service

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.JacksonException
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
Expand All @@ -27,6 +28,7 @@ import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.stringParam
import org.jetbrains.exposed.sql.update
import org.jetbrains.exposed.sql.wrapAsExpression
import org.pathoplexus.backend.controller.BadRequestException
import org.pathoplexus.backend.controller.ForbiddenException
import org.pathoplexus.backend.controller.UnprocessableEntityException
import org.pathoplexus.backend.model.HeaderId
Expand Down Expand Up @@ -130,64 +132,70 @@ class DatabaseService(
}
}

fun updateProcessedData(inputStream: InputStream): List<ValidationResult> {
fun updateProcessedData(inputStream: InputStream): List<SequenceValidation> {
log.info { "updating processed data" }
val reader = BufferedReader(InputStreamReader(inputStream))

val validationResults = mutableListOf<ValidationResult>()

reader.lineSequence().forEach { line ->
val sequenceVersion = objectMapper.readValue<SequenceVersion>(line)
val validationResult = sequenceValidatorService.validateSequence(sequenceVersion)

if (sequenceValidatorService.isValidResult(validationResult)) {
val numInserted = insertProcessedData(sequenceVersion)
if (numInserted != 1) {
validationResults.add(
ValidationResult(
sequenceVersion.sequenceId,
emptyList(),
emptyList(),
emptyList(),
listOf(insertProcessedDataError(sequenceVersion)),
),
)
}
} else {
validationResults.add(validationResult)
return reader.lineSequence().map { line ->
val submittedProcessedData = try {
objectMapper.readValue<SubmittedProcessedData>(line)
} catch (e: JacksonException) {
throw BadRequestException("Failed to deserialize NDJSON line: ${e.message}", e)
}
val validationResult = sequenceValidatorService.validateSequence(submittedProcessedData)

val numInserted = insertProcessedDataWithStatus(submittedProcessedData, validationResult)
if (numInserted != 1) {
throwInsertFailedException(submittedProcessedData)
}
}

return validationResults
SequenceValidation(submittedProcessedData.sequenceId, submittedProcessedData.version, validationResult)
}.toList()
}

private fun insertProcessedData(sequenceVersion: SequenceVersion): Int {
val newStatus = if (sequenceVersion.errors != null &&
sequenceVersion.errors.isArray &&
sequenceVersion.errors.size() > 0
) {
Status.NEEDS_REVIEW.name
} else {
Status.PROCESSED.name
}
private fun insertProcessedDataWithStatus(
submittedProcessedData: SubmittedProcessedData,
validationResult: ValidationResult,
): Int {
val now = Clock.System.now().toLocalDateTime(TimeZone.UTC)

val validationErrors = when (validationResult) {
is ValidationResult.Error -> validationResult.validationErrors
is ValidationResult.Ok -> emptyList()
}.map {
PreprocessingAnnotation(
listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.Metadata,
it.fieldName,
),
),
"${it.type}: ${it.message}",
)
}
val computedErrors = validationErrors + submittedProcessedData.errors.orEmpty()

val newStatus = when {
computedErrors.isEmpty() -> Status.PROCESSED
else -> Status.NEEDS_REVIEW
}

return SequencesTable.update(
where = {
(SequencesTable.sequenceId eq sequenceVersion.sequenceId) and
(SequencesTable.version eq sequenceVersion.version) and
(SequencesTable.sequenceId eq submittedProcessedData.sequenceId) and
(SequencesTable.version eq submittedProcessedData.version) and
(SequencesTable.status eq Status.PROCESSING.name)
},
) {
it[status] = newStatus
it[processedData] = sequenceVersion.data
it[errors] = sequenceVersion.errors
it[warnings] = sequenceVersion.warnings
it[status] = newStatus.name
it[processedData] = submittedProcessedData.data
it[errors] = computedErrors
it[warnings] = submittedProcessedData.warnings
it[finishedProcessingAt] = now
}
}

private fun insertProcessedDataError(sequenceVersion: SequenceVersion): String {
private fun throwInsertFailedException(submittedProcessedData: SubmittedProcessedData): String {
val selectedSequences = SequencesTable
.slice(
SequencesTable.sequenceId,
Expand All @@ -196,17 +204,24 @@ class DatabaseService(
)
.select(
where = {
(SequencesTable.sequenceId eq sequenceVersion.sequenceId) and
(SequencesTable.version eq sequenceVersion.version)
(SequencesTable.sequenceId eq submittedProcessedData.sequenceId) and
(SequencesTable.version eq submittedProcessedData.version)
},
)
if (selectedSequences.count().toInt() == 0) {
return "SequenceId does not exist"

val sequenceVersion = "${submittedProcessedData.sequenceId}.${submittedProcessedData.version}"
if (selectedSequences.count() == 0L) {
throw UnprocessableEntityException("Sequence version $sequenceVersion does not exist")
}
if (selectedSequences.any { it[SequencesTable.status] != Status.PROCESSING.name }) {
return "SequenceId is not in processing state"

val selectedSequence = selectedSequences.first()
if (selectedSequence[SequencesTable.status] != Status.PROCESSING.name) {
throw UnprocessableEntityException(
"Sequence version $sequenceVersion is in not in state ${Status.PROCESSING} " +
"(was ${selectedSequence[SequencesTable.status]})",
)
}
return "Unknown error"
throw RuntimeException("Update processed data: Unexpected error for sequence version $sequenceVersion")
}

fun approveProcessedData(submitter: String, sequenceIds: List<Long>) {
Expand Down Expand Up @@ -266,7 +281,7 @@ class DatabaseService(
(SequencesTable.version eq maxVersionQuery)
},
).limit(numberOfSequences).map { row ->
SequenceVersion(
SubmittedProcessedData(
row[SequencesTable.sequenceId],
row[SequencesTable.version],
row[SequencesTable.processedData]!!,
Expand Down Expand Up @@ -297,7 +312,7 @@ class DatabaseService(
(SequencesTable.submitter eq submitter)
},
).limit(numberOfSequences).map { row ->
SequenceVersion(
SubmittedProcessedData(
row[SequencesTable.sequenceId],
row[SequencesTable.version],
row[SequencesTable.processedData]!!,
Expand Down Expand Up @@ -558,14 +573,47 @@ class DatabaseService(
}
}

data class SequenceVersion(
data class SubmittedProcessedData(
fengelniederhammer marked this conversation as resolved.
Show resolved Hide resolved
val sequenceId: Long,
val version: Long,
val data: JsonNode,
val errors: JsonNode? = null,
val warnings: JsonNode? = null,
val data: ProcessedData,
@Schema(description = "The preprocessing will be considered failed if this is not empty")
val errors: List<PreprocessingAnnotation>? = null,
@Schema(
description =
"Issues where data is not necessarily wrong, but the submitter might want to look into those warnings.",
)
val warnings: List<PreprocessingAnnotation>? = null,
)

data class ProcessedData(
@Schema(
example = """{"date": "2020-01-01", "country": "Germany", "age": 42, "qc": 0.95}""",
description = "Key value pairs of metadata, correctly typed",
)
val metadata: Map<String, JsonNode>,
@Schema(
example = """{"segment1": "ACTG", "segment2": "GTCA"}""",
description = "The key is the segment name, the value is the nucleotide sequence",
)
val unalignedNucleotideSequences: Map<String, String>,
fengelniederhammer marked this conversation as resolved.
Show resolved Hide resolved
)

data class PreprocessingAnnotation(
val source: List<PreprocessingAnnotationSource>,
@Schema(description = "A descriptive message that helps the submitter to fix the issue") val message: String,
TobiasKampmann marked this conversation as resolved.
Show resolved Hide resolved
)

data class PreprocessingAnnotationSource(
val type: PreprocessingAnnotationSourceType,
@Schema(description = "Field or sequence segment name") val name: String,
)

enum class PreprocessingAnnotationSourceType {
Metadata,
fengelniederhammer marked this conversation as resolved.
Show resolved Hide resolved
NucleotideSequence,
}

data class SequenceVersionStatus(
val sequenceId: Long,
val version: Long,
Expand Down Expand Up @@ -603,6 +651,12 @@ data class OriginalData(
val unalignedNucleotideSequences: Map<String, String>,
)

data class SequenceValidation(
val sequenceId: Long,
val version: Long,
val validation: ValidationResult,
)

enum class Status {
@JsonProperty("RECEIVED")
RECEIVED,
Expand Down
Loading