Skip to content

Commit

Permalink
Merge pull request #572 from NDLANO/grep-dump-indexing
Browse files Browse the repository at this point in the history
search-api: Index grep with dump rather than calling endpoints
  • Loading branch information
jnatten authored Jan 14, 2025
2 parents 6503a5a + 8459834 commit 89f6a00
Show file tree
Hide file tree
Showing 22 changed files with 646 additions and 196 deletions.
9 changes: 6 additions & 3 deletions common/src/main/scala/no/ndla/common/CirceUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ import scala.util.{Failure, Try}
object CirceUtil {
// NOTE: Circe's `DecodingFailure` does not include a stack trace, so we wrap it in our own exception
// to make it more like other failures.
case class CirceFailure(message: String) extends RuntimeException(message)
case class CirceFailure(message: String, jsonString: String) extends RuntimeException(message)
object CirceFailure {
def apply(reason: Throwable): Throwable = new CirceFailure(reason.getMessage).initCause(reason)
def apply(jsonString: String, reason: Throwable): Throwable = {
val message = s"${reason.getMessage}\n$jsonString"
new CirceFailure(message, jsonString).initCause(reason)
}
}

def tryParseAs[T](str: String)(implicit d: Decoder[T]): Try[T] = {
parser
.parse(str)
.toTry
.flatMap(_.as[T].toTry)
.recoverWith { ex => Failure(CirceFailure(ex)) }
.recoverWith { ex => Failure(CirceFailure(str, ex)) }
}

/** This might throw an exception! Use with care, probably only use this in tests */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ trait DraftController {
.in("grep-codes")
.summary("Retrieves a list of all previously used grepCodes in articles")
.description("Retrieves a list of all previously used grepCodes in articles")
.deprecated()
.in(queryParam)
.in(pageSize)
.in(pageNo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait InternController {
val internController: InternController

class InternController extends TapirController with StrictLogging {
import props.{DraftSearchIndex, DraftTagSearchIndex, DraftGrepCodesSearchIndex}
import props.{DraftSearchIndex, DraftTagSearchIndex}

override val prefix: EndpointInput[Unit] = "intern"
override val enableSwagger = false
Expand Down Expand Up @@ -103,8 +103,7 @@ trait InternController {
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
val articleIndex = createIndexFuture(articleIndexService, numShards)
val tagIndex = createIndexFuture(tagIndexService, numShards)
val grepIndex = createIndexFuture(grepCodesIndexService, numShards)
val indexResults = Future.sequence(List(articleIndex, tagIndex, grepIndex))
val indexResults = Future.sequence(List(articleIndex, tagIndex))

Await.result(indexResults, Duration.Inf).sequence match {
case Failure(ex) =>
Expand All @@ -126,14 +125,12 @@ trait InternController {
val indexes = for {
articleIndex <- Future { articleIndexService.findAllIndexes(DraftSearchIndex) }
tagIndex <- Future { tagIndexService.findAllIndexes(DraftTagSearchIndex) }
grepIndex <- Future { grepCodesIndexService.findAllIndexes(DraftGrepCodesSearchIndex) }
} yield (articleIndex, tagIndex, grepIndex)
} yield (articleIndex, tagIndex)

val deleteResults: Seq[Try[_]] = Await.result(indexes, Duration(10, TimeUnit.MINUTES)) match {
case (Failure(articleFail), _, _) => return articleFail.getMessage.asLeft
case (_, Failure(tagFail), _) => return tagFail.getMessage.asLeft
case (_, _, Failure(grepFail)) => return grepFail.getMessage.asLeft
case (Success(articleIndexes), Success(tagIndexes), Success(grepIndexes)) =>
case (Failure(articleFail), _) => return articleFail.getMessage.asLeft
case (_, Failure(tagFail)) => return tagFail.getMessage.asLeft
case (Success(articleIndexes), Success(tagIndexes)) =>
val articleDeleteResults = articleIndexes.map(index => {
logger.info(s"Deleting article index $index")
articleIndexService.deleteIndexWithName(Option(index))
Expand All @@ -142,11 +139,7 @@ trait InternController {
logger.info(s"Deleting tag index $index")
tagIndexService.deleteIndexWithName(Option(index))
})
val grepDeleteResults = grepIndexes.map(index => {
logger.info(s"Deleting grep index $index")
grepCodesIndexService.deleteIndexWithName(Option(index))
})
articleDeleteResults ++ tagDeleteResults ++ grepDeleteResults
articleDeleteResults ++ tagDeleteResults
}

val (errors, successes) = deleteResults.partition(_.isFailure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ trait WriteService {
searchApiClient.indexDraft(article, user)(ec): Unit
articleIndexService.indexAsync(articleId, article)(ec): Unit
tagIndexService.indexAsync(articleId, article)(ec): Unit
grepCodesIndexService.indexAsync(articleId, article)(ec): Unit
Success(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,23 @@ class InternControllerTest extends UnitSuite with TestEnvironment with TapirCont
test("That DELETE /index removes all indexes") {
reset(
articleIndexService,
tagIndexService,
grepCodesIndexService
tagIndexService
)

when(articleIndexService.findAllIndexes(any[String])).thenReturn(Success(List("index1", "index2")))
when(tagIndexService.findAllIndexes(any[String])).thenReturn(Success(List("index7", "index8")))
when(grepCodesIndexService.findAllIndexes(any[String])).thenReturn(Success(List("index9", "index10")))
doReturn(Success(""), Nil: _*).when(articleIndexService).deleteIndexWithName(Some("index1"))
doReturn(Success(""), Nil: _*).when(articleIndexService).deleteIndexWithName(Some("index2"))
doReturn(Success(""), Nil: _*).when(tagIndexService).deleteIndexWithName(Some("index7"))
doReturn(Success(""), Nil: _*).when(tagIndexService).deleteIndexWithName(Some("index8"))
doReturn(Success(""), Nil: _*).when(grepCodesIndexService).deleteIndexWithName(Some("index9"))
doReturn(Success(""), Nil: _*).when(grepCodesIndexService).deleteIndexWithName(Some("index10"))

{
val res = simpleHttpClient.send(
quickRequest
.delete(uri"http://localhost:$serverPort/intern/index")
)
res.code.code should be(200)
res.body should equal("Deleted 6 indexes")
res.body should equal("Deleted 4 indexes")
}

verify(articleIndexService).findAllIndexes(props.DraftSearchIndex)
Expand All @@ -104,10 +100,6 @@ class InternControllerTest extends UnitSuite with TestEnvironment with TapirCont
verify(tagIndexService).deleteIndexWithName(Some("index8"))
verifyNoMoreInteractions(tagIndexService)

verify(grepCodesIndexService).findAllIndexes(props.DraftGrepCodesSearchIndex)
verify(grepCodesIndexService).deleteIndexWithName(Some("index9"))
verify(grepCodesIndexService).deleteIndexWithName(Some("index10"))
verifyNoMoreInteractions(grepCodesIndexService)
}

test("That DELETE /index fails if at least one index isn't found, and no indexes are deleted") {
Expand Down Expand Up @@ -140,22 +132,18 @@ class InternControllerTest extends UnitSuite with TestEnvironment with TapirCont
) {
reset(
articleIndexService,
tagIndexService,
grepCodesIndexService
tagIndexService
)

when(articleIndexService.findAllIndexes(any[String])).thenReturn(Success(List("index1", "index2")))
when(tagIndexService.findAllIndexes(any[String])).thenReturn(Success(List("index7", "index8")))
when(grepCodesIndexService.findAllIndexes(any[String])).thenReturn(Success(List("index9", "index10")))

doReturn(Success(""), Nil: _*).when(articleIndexService).deleteIndexWithName(Some("index1"))
doReturn(Failure(new RuntimeException("No index with name 'index2' exists")), Nil: _*)
.when(articleIndexService)
.deleteIndexWithName(Some("index2"))
doReturn(Success(""), Nil: _*).when(tagIndexService).deleteIndexWithName(Some("index7"))
doReturn(Success(""), Nil: _*).when(tagIndexService).deleteIndexWithName(Some("index8"))
doReturn(Success(""), Nil: _*).when(grepCodesIndexService).deleteIndexWithName(Some("index9"))
doReturn(Success(""), Nil: _*).when(grepCodesIndexService).deleteIndexWithName(Some("index10"))

{
val res = simpleHttpClient.send(
Expand All @@ -164,15 +152,13 @@ class InternControllerTest extends UnitSuite with TestEnvironment with TapirCont
)
res.code.code should be(500)
res.body should equal(
"Failed to delete 1 index: No index with name 'index2' exists. 5 indexes were deleted successfully."
"Failed to delete 1 index: No index with name 'index2' exists. 3 indexes were deleted successfully."
)
}

verify(articleIndexService).deleteIndexWithName(Some("index1"))
verify(articleIndexService).deleteIndexWithName(Some("index2"))
verify(tagIndexService).deleteIndexWithName(Some("index7"))
verify(tagIndexService).deleteIndexWithName(Some("index8"))
verify(grepCodesIndexService).deleteIndexWithName(Some("index9"))
verify(grepCodesIndexService).deleteIndexWithName(Some("index10"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ class WriteServiceTest extends UnitSuite with TestEnvironment {
verify(draftRepository, times(0)).updateArticle(any[Draft], any[Boolean])(any)
verify(articleIndexService, times(1)).indexAsync(any, any)(any)
verify(tagIndexService, times(1)).indexAsync(any, any)(any)
verify(grepCodesIndexService, times(1)).indexAsync(any, any)(any)
}

test("That updateArticle updates only content properly") {
Expand Down
10 changes: 6 additions & 4 deletions project/Module.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import GithubWorkflowPlugin.autoImport.*
import com.scalatsi.plugin.ScalaTsiPlugin.autoImport.{
typescriptExports,
typescriptGenerationImports,
typescriptOutputFile
typescriptOutputFile,
typescriptTaggedUnionDiscriminator
}
import org.scalafmt.sbt.ScalafmtPlugin.autoImport.*
import org.typelevel.sbt.tpolecat.TpolecatPlugin.autoImport.*
Expand Down Expand Up @@ -192,9 +193,10 @@ trait Module {

protected def typescriptSettings(imports: Seq[String], exports: Seq[String]) = {
Seq(
typescriptGenerationImports := imports,
typescriptExports := exports,
typescriptOutputFile := file("./typescript/types-backend") / s"${this.moduleName}.ts"
typescriptGenerationImports := imports,
typescriptExports := exports,
typescriptOutputFile := file("./typescript/types-backend") / s"${this.moduleName}.ts",
typescriptTaggedUnionDiscriminator := Some("typename")
)
}
}
3 changes: 2 additions & 1 deletion project/searchapi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ object searchapi extends Module {
"SubjectAggregationsDTO",
"SubjectAggsInputDTO",
"GrepSearchInputDTO",
"grep.GrepSearchResultsDTO"
"grep.GrepSearchResultsDTO",
"grep.GrepResultDTO"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,96 +7,119 @@

package no.ndla.searchapi.integration

import java.util.concurrent.Executors
import cats.implicits.toTraverseOps
import com.typesafe.scalalogging.StrictLogging
import io.circe.Decoder
import no.ndla.common.CirceUtil
import no.ndla.common.implicits.TryQuestionMark
import no.ndla.common.model.NDLADate
import no.ndla.network.NdlaClient
import no.ndla.network.model.RequestInfo
import no.ndla.searchapi.Props
import no.ndla.searchapi.caching.Memoize
import no.ndla.searchapi.model.api.GrepException
import no.ndla.searchapi.model.grep.*
import sttp.client3.quick.*

import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.{Failure, Success, Try}
import java.io.File
import java.nio.file.Files
import scala.util.Using.Releasable
import scala.util.{Failure, Success, Try, Using}

trait GrepApiClient {
this: NdlaClient & Props =>
val grepApiClient: GrepApiClient

class GrepApiClient extends StrictLogging {
import props.GrepApiUrl
private val GrepApiEndpoint = s"$GrepApiUrl/kl06/v201906"
private val grepDumpUrl = s"$GrepApiUrl/kl06/v201906/dump/json"

private def getAllKjerneelementer: Try[List[GrepKjerneelement]] =
get[List[GrepKjerneelement]](s"$GrepApiEndpoint/kjerneelementer-lk20/").map(_.distinct)

private def getAllKompetansemaal: Try[List[GrepKompetansemaal]] =
get[List[GrepKompetansemaal]](s"$GrepApiEndpoint/kompetansemaal-lk20/").map(_.distinct)

private def getAllKompetansemaalSett: Try[List[GrepKompetansemaalSett]] =
get[List[GrepKompetansemaalSett]](s"$GrepApiEndpoint/kompetansemaalsett-lk20/").map(_.distinct)

private def getAllTverrfagligeTemaer: Try[List[GrepTverrfagligTema]] =
get[List[GrepTverrfagligTema]](s"$GrepApiEndpoint/tverrfaglige-temaer-lk20/").map(_.distinct)

private def getAllLaereplaner: Try[List[GrepLaererplan]] =
get[List[GrepLaererplan]](s"$GrepApiEndpoint/laereplaner-lk20/").map(_.distinct)
private def readFile(file: File): Try[String] = Try {
Using.resource(scala.io.Source.fromFile(file)) { source =>
source.getLines().mkString
}
}

// NOTE: We add a helper so we don't have to provide `()` where this is used :^)
val getGrepBundle: () => Try[GrepBundle] = () => _getGrepBundle(())
private def readGrepJsonFiles[T](dump: File, path: String)(implicit d: Decoder[T]): Try[List[T]] = {
val folder = new File(dump, path)
val jsonFiles = folder.list()
jsonFiles.toList.traverse { f =>
for {
jsonStr <- readFile(new File(folder, f))
parsed <- CirceUtil.tryParseAs[T](jsonStr)
} yield parsed
}
}

private def getKjerneelementerLK20(dump: File): Try[List[GrepKjerneelement]] =
readGrepJsonFiles[GrepKjerneelement](dump, "kjerneelementer-lk20")
private def getKompetansemaalLK20(dump: File): Try[List[GrepKompetansemaal]] =
readGrepJsonFiles[GrepKompetansemaal](dump, "kompetansemaal-lk20")
private def getKompetansemaalsettLK20(dump: File): Try[List[GrepKompetansemaalSett]] =
readGrepJsonFiles[GrepKompetansemaalSett](dump, "kompetansemaalsett-lk20")
private def getTverrfagligeTemaerLK20(dump: File): Try[List[GrepTverrfagligTema]] =
readGrepJsonFiles[GrepTverrfagligTema](dump, "tverrfaglige-temaer-lk20")
private def getLaereplanerLK20(dump: File): Try[List[GrepLaererplan]] =
readGrepJsonFiles[GrepLaererplan](dump, "laereplaner-lk20")

private def getBundleFromDump(dump: File): Try[GrepBundle] = for {
kjerneelementer <- getKjerneelementerLK20(dump)
kompetansemaal <- getKompetansemaalLK20(dump)
kompetansemaalsett <- getKompetansemaalsettLK20(dump)
tverrfagligeTemaer <- getTverrfagligeTemaerLK20(dump)
laereplaner <- getLaereplanerLK20(dump)
} yield GrepBundle(
kjerneelementer = kjerneelementer,
kompetansemaal = kompetansemaal,
kompetansemaalsett = kompetansemaalsett,
tverrfagligeTemaer = tverrfagligeTemaer,
laereplaner = laereplaner
)

val getGrepBundle: () => Try[GrepBundle] = () => _getGrepBundle(())
private val _getGrepBundle: Memoize[Unit, Try[GrepBundle]] = new Memoize(1000 * 60, _ => getGrepBundleUncached)

/** The memoized function of this [[getGrepBundle]] should probably be used in most cases */
private def getGrepBundleUncached: Try[GrepBundle] = {
logger.info("Fetching grep in bulk...")
val startFetch = System.currentTimeMillis()
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3))

val requestInfo = RequestInfo.fromThreadContext()

/** Calls function in separate thread and converts Try to Future */
def tryToFuture[T](x: () => Try[T]) = Future {
requestInfo.setThreadContextRequestInfo()
x()
}.flatMap(Future.fromTry)
implicit object FileIsReleasable extends Releasable[File] {
private def deleteDirectory(f: File): Unit = {
if (f.isDirectory) {
f.listFiles().foreach(deleteDirectory)
}
f.delete(): Unit
}
def release(resource: File): Unit = deleteDirectory(resource)
}

val kjerneelementer = tryToFuture(() => getAllKjerneelementer)
val kompetansemaal = tryToFuture(() => getAllKompetansemaal)
val kompetansemaalsett = tryToFuture(() => getAllKompetansemaalSett)
val tverrfagligeTemaer = tryToFuture(() => getAllTverrfagligeTemaer)
val laererplaner = tryToFuture(() => getAllLaereplaner)
private def getGrepBundleUncached: Try[GrepBundle] = {
val date = NDLADate.now().toUTCEpochSecond
val tempDirPath = Try(Files.createTempDirectory(s"grep-dump-$date")).?
Using(tempDirPath.toFile) { tempDir =>
val zippedDump = fetchDump(tempDir).?
val unzippedDump = ZipUtil.unzip(zippedDump, tempDir, deleteArchive = true).?
val bundle = getBundleFromDump(unzippedDump).?
logger.info("Successfully fetched grep bundle")
bundle
}
}

val x = for {
kjerne <- kjerneelementer
kompetanse <- kompetansemaal
kompetansesett <- kompetansemaalsett
tverrfag <- tverrfagligeTemaer
laere <- laererplaner
} yield GrepBundle(
kjerneelementer = kjerne,
kompetansemaal = kompetanse,
kompetansemaalsett = kompetansesett,
tverrfagligeTemaer = tverrfag,
laereplaner = laere
)
case class GrepDumpDownloadException(message: String) extends RuntimeException(message) {
def withCause(cause: Throwable): GrepDumpDownloadException = {
initCause(cause)
this
}
}

Try(Await.result(x, Duration(300, "seconds"))) match {
case Success(bundle) =>
logger.info(s"Fetched grep in ${System.currentTimeMillis() - startFetch}ms...")
Success(bundle)
private def fetchDump(tempDir: File): Try[File] = {
val outputFile = new File(tempDir, "grep-dump.zip")
logger.info(s"Downloading grep dump from $grepDumpUrl to ${outputFile.getAbsolutePath}")
val request = quickRequest
.get(uri"$grepDumpUrl")
.response(asFile(outputFile))
Try(simpleHttpClient.send(request)) match {
case Success(response) if response.isSuccess => Success(outputFile)
case Success(response) =>
Failure(GrepDumpDownloadException(s"Failed to fetch grep dump: ${response.statusText}"))
case Failure(ex) =>
logger.error(s"Could not fetch grep bundle (${ex.getMessage})", ex)
Failure(GrepException("Could not fetch grep bundle..."))
Failure(GrepDumpDownloadException(s"Failed to fetch grep dump: ${ex.getMessage}").withCause(ex))
}
}

private def get[A: Decoder](url: String, params: (String, String)*): Try[A] = {
val request = quickRequest.get(uri"$url?$params").readTimeout(60.seconds)
ndlaClient.fetch[A](request)
}
}
}
Loading

0 comments on commit 89f6a00

Please sign in to comment.