Skip to content

Commit

Permalink
[batch/qob] propagates errors from failed QoB worker jobs without res…
Browse files Browse the repository at this point in the history
…ult files

Currently, when a QoB worker job fails in a way that prevents it from
writing a result file with the error's stack trace in it, the Batch
worker that started the job tries to get the result file anyway, and
when it fails, raises a 404. This change retrieves the stack trace from
the QoB job and raises an error with the stack trace included.
  • Loading branch information
iris-garden committed Nov 28, 2023
1 parent 8b498aa commit 1cad93a
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 183 deletions.
333 changes: 169 additions & 164 deletions batch/jvm-entryway/src/main/java/is/hail/JVMEntryway.java

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pytest-qob: upload-qob-jar upload-qob-test-resources install-editable
--durations=50 \
--self-contained-html \
--html=../build/reports/pytest.html \
--timeout=120 \
--timeout=1200 \
$(PYTEST_TARGET) \
$(PYTEST_ARGS)

Expand Down Expand Up @@ -316,8 +316,8 @@ upload-artifacts: $(WHEEL)
upload-qob-test-resources: $(shell git ls-files src/test/resources)
upload-qob-test-resources: $(shell git ls-files python/hail/docs/data)
! [ -z $(NAMESPACE) ] # call this like: make upload-qob-test-resources NAMESPACE=default
gcloud storage cp src/test/resources/\* $(CLOUD_HAIL_TEST_RESOURCES_DIR)
gcloud storage cp python/hail/docs/data/\* $(CLOUD_HAIL_DOCTEST_DATA_DIR)
gcloud storage cp -R src/test/resources/\* $(CLOUD_HAIL_TEST_RESOURCES_DIR)
gcloud storage cp -R python/hail/docs/data/\* $(CLOUD_HAIL_DOCTEST_DATA_DIR)
# # In Azure, use the following instead of gcloud storage cp
# python3 -m hailtop.aiotools.copy -vvv 'null' '[\
# {"from":"src/test/resources","to":"$(CLOUD_HAIL_TEST_RESOURCES_DIR)"},\
Expand Down
4 changes: 3 additions & 1 deletion hail/scripts/upload_qob_jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ else
JAR_LOCATION="${TEST_STORAGE_URI}/${NAMESPACE}/jars/${TOKEN}/${REVISION}.jar"
fi

python3 -m hailtop.aiotools.copy -vvv 'null' '[{"from":"'${SHADOW_JAR}'", "to":"'${JAR_LOCATION}'"}]'
# FIXME revert when done testing
gcloud storage cp ${SHADOW_JAR} ${JAR_LOCATION}
# python3 -m hailtop.aiotools.copy -vvv 'null' '[{"from":"'${SHADOW_JAR}'", "to":"'${JAR_LOCATION}'"}]'
echo ${JAR_LOCATION} > ${PATH_FILE}
39 changes: 24 additions & 15 deletions hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ class ServiceBackend(

stageCount += 1
implicit val formats: Formats = DefaultFormats
val batchState = (batch \ "state").extract[String]
if (batchState == "failed") {
throw new HailBatchFailure(s"Update $updateId for batch $batchId failed")
}

log.info(s"parallelizeAndComputeWithIndex: $token: reading results")

Expand All @@ -281,16 +277,29 @@ class ServiceBackend(
collection.map { case (_, i) =>
(
() => {
val bytes = fs.readNoCompression(s"$root/result.$i")
if (bytes(0) != 0) {
bytes.slice(1, bytes.length)
} else {
val errorInformationBytes = bytes.slice(1, bytes.length)
val is = new DataInputStream(new ByteArrayInputStream(errorInformationBytes))
val shortMessage = readString(is)
val expandedMessage = readString(is)
val errorId = is.readInt()
throw new HailWorkerException(i, shortMessage, expandedMessage, errorId)
try {
val bytes = fs.readNoCompression(s"$root/result.$i")
if (bytes(0) != 0) {
bytes.slice(1, bytes.length)
} else {
val errorInformationBytes = bytes.slice(1, bytes.length)
val is = new DataInputStream(new ByteArrayInputStream(errorInformationBytes))
val shortMessage = readString(is)
val expandedMessage = readString(is)
val errorId = is.readInt()
throw new HailWorkerException(i, shortMessage, expandedMessage, errorId)
}
} catch {
case e: HailWorkerException => throw e
case e: Exception => {
val jobId = (jobs(i) \ "job_id").extract[Int]
val err = s"Update $updateId for batch $batchId failed due to error in job $jobId."
val (jobData, jobLog) = batchClient.getJob(batchId, jobId)
(jobData \ "state").extract[String].toLowerCase() match {
case "failed" => throw new HailWorkerException(i, err, s"""Log for job $jobId:\n\n${(jobLog \ "main").extract[String]}\n\n""", -1)
case _ => throw new HailBatchFailure(err, e)
}
}
}
},
i
Expand Down Expand Up @@ -407,7 +416,7 @@ class ServiceBackend(
}

class EndOfInputException extends RuntimeException
class HailBatchFailure(message: String) extends RuntimeException(message)
class HailBatchFailure(message: String, error: Exception) extends RuntimeException(message, error)

object ServiceBackendAPI {
private[this] val log = Logger.getLogger(getClass.getName())
Expand Down
2 changes: 2 additions & 0 deletions hail/src/main/scala/is/hail/backend/service/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ object Worker {
timer.end("executeFunction")
timer.start("writeOutputs")

/* TODO uncomment after testing
retryTransientErrors {
write(s"$root/result.$i") { dos =>
result match {
Expand All @@ -194,6 +195,7 @@ object Worker {
}
}
}
*/

timer.end("writeOutputs")
timer.end(s"Job $i")
Expand Down
10 changes: 10 additions & 0 deletions hail/src/main/scala/is/hail/services/DeployConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,14 @@ class DeployConfig(
def baseUrl(service: String, baseScheme: String = "http"): String = {
s"${ scheme(baseScheme) }://${ domain(service) }${ basePath(service) }"
}

def externalUrl(service: String, path: String, baseScheme: String = "http"): String = {
val ns = getServiceNamespace(service)
val prefix = s"${baseScheme}s://"
(ns, service) match {
case ("default", "www") => s"$prefix$domain$path"
case ("default", _) => s"$prefix$service$domain$path"
case _ => s"${prefix}internal.$domain/$ns/$service$path"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ class BatchClient(
throw new AssertionError("unreachable")
}

def getJob(batchId: Long, jobId: Int): (JValue, JValue) = {
(
get(s"/api/v1alpha/batches/$batchId/jobs/$jobId"),
get(s"/api/v1alpha/batches/$batchId/jobs/$jobId/log")
)
}

private def createBunches(jobs: IndexedSeq[JObject]): BoxedArrayBuilder[Array[Array[Byte]]] = {
val bunches = new BoxedArrayBuilder[Array[Array[Byte]]]()
val bunchb = new BoxedArrayBuilder[Array[Byte]]()
Expand Down

0 comments on commit 1cad93a

Please sign in to comment.