Skip to content

Commit

Permalink
Add list of uploaded files, fix uploading issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
OndrejSpanel committed Oct 24, 2019
1 parent db6e4ea commit 9426ed5
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 49 deletions.
22 changes: 14 additions & 8 deletions backend/src/main/scala/com/github/opengrabeso/mixtio/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,27 +237,34 @@ object Storage extends FileStore {
}
}

def digest(namespace: String, userId: String, path: String): Option[String] = {
def metadata(namespace: String, userId: String, path: String): Seq[(String, String)] = {
val prefix = userFilename(namespace, path, userId)
val blobs = storage.list(bucket, BlobListOption.prefix(prefix.name))
val found = blobs.iterateAll().asScala

// there should be at most one result
found.toSeq.flatMap { i =>
assert(i.getName.startsWith(prefix.name))
val name = i.getName.drop(prefix.name.length)
val m = try {
val md = storage.get(bucket, i.getName, BlobGetOption.fields(BlobField.METADATA))
val userData = md.getMetadata.asScala
userData.get("digest")
md.getMetadata.asScala.toSeq
} catch {
case e: Exception =>
e.printStackTrace()
None
Nil
}
m
//println(s"enum '$name' - '$userId': md '$m'")
}.headOption
}
}

def metadataValue(namespace: String, userId: String, path: String, name: String): Option[String] = {
val md = metadata(namespace, userId, path)
md.find(_._1 == name).map(_._2)
}

def digest(namespace: String, userId: String, path: String): Option[String] = {
metadataValue(namespace, userId, path, "digest")
}

// return true when the digest is matching (i.e. file does not need to be updated)
Expand All @@ -266,11 +273,10 @@ object Storage extends FileStore {
oldDigest.contains(digestToCompare)
}

// not used, consider removing (not well tested on GCS)
def updateMetadata(file: String, metadata: Seq[(String, String)]): Boolean = {
val blobId = fileId(file)
val md = storage.get(blobId, BlobGetOption.fields(BlobField.METADATA))
val userData = md.getMetadata.asScala
val userData = Option(md.getMetadata).getOrElse(new java.util.HashMap[String, String]).asScala
val matching = metadata.forall { case (key, name) =>
userData.get(key).contains(name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon
Storage.check(Main.namespace.stage, userId, file, digest)
}.map(_._1)

val pushNamespace = Main.namespace.pushProgress(session)
for (f <- needed) {
// TODO: store some file information (size, ...)
Storage.store(Main.namespace.pushProgress(session), f, userId, Anything, Anything)
Storage.store(pushNamespace, f, userId, Anything, Anything, metadata = Seq("status" -> "offered"))
}
// write started tag once the pending files information is complete, to mark it can be scanned now
Storage.store(Main.namespace.pushProgress(session), markerFileName, userId, Anything, Anything)
Storage.store(pushNamespace, markerFileName, userId, Anything, Anything)
println(s"offered $needed for $userId")
needed
}
Expand All @@ -48,11 +49,11 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon
val stream = new ByteArrayInputStream(content)
val decompressed = decompressStream(stream)

requests.Upload.storeFromStreamWithDigest(userId, id, localTimeZone, decompressed, digest)
val pushNamespace = Main.namespace.pushProgress(session)
if (true) { // disable for debugging - prevent any upload to debug the upload progress view
Storage.delete(Storage.FullName(pushNamespace, id, userId))
if (true) { // disable for debugging
requests.Upload.storeFromStreamWithDigest(userId, id, localTimeZone, decompressed, digest)
}
val pushNamespace = Main.namespace.pushProgress(session)
Storage.updateMetadata(Storage.FullName(pushNamespace, id, userId).name, metadata = Seq("status" -> "pushed"))
println(s"pushed $id for $userId")
}

Expand All @@ -61,17 +62,26 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon
val sessionPushProgress = Storage.enumerate(pushNamespace, userId)
val started = sessionPushProgress.exists(_._2 == markerFileName)
if (!started) {
Seq("") // special response - not empty, but not the list of the files yes
(Seq(""), Nil) // special response - not empty, but not the list of the files yes
} else {
val pending = for {
val pendingOrDone = for {
(_, f) <- sessionPushProgress
if f != markerFileName
} yield f
} yield {
// check metadata, if done, we can delete the file once reported
if (Storage.metadataValue(pushNamespace, userId, f, "status").contains("pushed")) {
Storage.delete(Storage.FullName(pushNamespace, f, userId))
f -> true
} else {
f -> false
}
}
val (pending, done) = pendingOrDone.toSeq.partition(_._2)
if (pending.isEmpty) {
// once we return empty response, we can delete the "started" marker file
Storage.delete(Storage.FullName(pushNamespace, markerFileName, userId))
}
pending.toSeq
(pending.map(_._1), done.map(_._1))
}
}

Expand Down
12 changes: 9 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType}

lazy val commonSettings = Seq(
organization := "com.github.ondrejspanel",
version := "0.1.10-beta",
version := "0.1.11-beta",
scalaVersion := "2.12.10",
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
)
Expand All @@ -20,7 +20,7 @@ lazy val jvmLibs = Seq(
"org.scalatest" %% "scalatest" % "3.0.8" % "test",

"io.udash" %% "udash-core" % udashVersion,
"io.udash" %% "udash-rest" % udashVersion excludeAll ExclusionRule(organization = "io.netty"),
"io.udash" %% "udash-rest" % udashVersion,
"io.udash" %% "udash-rpc" % udashVersion,
"io.udash" %% "udash-css" % udashVersion,
)
Expand Down Expand Up @@ -84,7 +84,13 @@ lazy val pushUploader = (project in file("push-uploader"))
name := "MixtioStart",
commonSettings,
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.9",
libraryDependencies ++= commonLibs ++ jvmLibs
libraryDependencies ++= commonLibs ++ jvmLibs,
assemblyMergeStrategy in assembly := {
case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
)

def inDevMode = true || sys.props.get("dev.mode").exists(value => value.equalsIgnoreCase("true"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ package push

import io.udash._

case class PageModel(s: settings_base.SettingsModel, pending: Seq[String])
case class PageModel(s: settings_base.SettingsModel, pending: Seq[String] = Seq(""), done: Seq[String] = Seq(""))

object PageModel extends HasModelPropertyCreator[PageModel]
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,15 @@ class PageView(

def getTemplate: Modifier = {

div(
ss.flexContainer,
div(
ss.container,
ss.flexItem,
template(model.subModel(_.s), presenter),
showIf(model.subSeq(_.pending).transform(_.isEmpty))(submitButton.render)
),
produce(model.subSeq(_.pending)) { pending =>
if (pending.nonEmpty) {
def produceList(seq: ReadableSeqProperty[String], title: String) = {
produce(seq) { file =>
if (file.nonEmpty) {
div(
ss.container,
ss.flexItem,
table(
tr(th(h2("Uploading:"))),
pending.map(file =>
tr(th(h2(title))),
file.map(file =>
tr(td(niceFileName(file)))
)
)
Expand All @@ -47,6 +40,19 @@ class PageView(
div().render
}
}
}

div(
ss.flexContainer,
div(
ss.container,
ss.flexItem,
template(model.subModel(_.s), presenter),
showIf(model.subSeq(_.pending).transform(_.isEmpty))(submitButton.render)
),

produceList(model.subSeq(_.pending), "Uploading:"),
produceList(model.subSeq(_.done), "Uploaded:")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ class PageViewFactory(
import scala.concurrent.ExecutionContext.Implicits.global

private def updatePending(model: ModelProperty[PageModel]): Unit = {
for (pending <- userService.api.get.push(sessionId, "").expected) {
for ((pending, done) <- userService.api.get.push(sessionId, "").expected) {
if (pending != Seq("")) {
model.subProp(_.pending).set(pending)
model.subProp(_.done).set(model.subProp(_.done).get ++ done)
}
if (pending.nonEmpty) {
dom.window.setTimeout(() => updatePending(model), 1000) // TODO: once long-poll is implemented, reduce or remove the delay
Expand All @@ -27,7 +28,7 @@ class PageViewFactory(
}

override def create(): (View, Presenter[PushPageState]) = {
val model = ModelProperty(PageModel(settings_base.SettingsModel(), Seq(""))) // start with non-empty placeholder until real state is confirmed
val model = ModelProperty(PageModel(settings_base.SettingsModel())) // start with non-empty placeholder until real state is confirmed

loadSettings(model.subModel(_.s), userService)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,26 +405,33 @@ object Start extends App {
}

val userAPI = api.userAPI(userId, authCode)

if (true) {
val name = Await.result(userAPI.name, Duration.Inf)
println(s"Uploading as $name")
}

val pushAPI = userAPI.push(sessionId, localTimeZone)

val fileContent = filesToSend.map(f => f._1 -> (f._2, f._3)).toMap

val toOffer = filesToSend.map(f => f._1 -> f._2)

for {
needed <- pushAPI.offerFiles(toOffer)
_ = reportProgress(needed.size)
id <- needed
} {
val (digest, content) = fileContent(id)
def gzipEncoded(bytes: Array[Byte]) = if (useGzip) Gzip.encode(ByteString(bytes)) else ByteString(bytes)
val wait = pushAPI.offerFiles(toOffer).map { needed =>
reportProgress(needed.size)

val upload = pushAPI.uploadFile(id, gzipEncoded(content).toArray, digest)
// consider async processing here - a few requests in parallel could improve throughput
Await.result(upload, Duration.Inf)
// TODO: reportProgress after each file
needed.foreach { id =>
val (digest, content) = fileContent(id)
def gzipEncoded(bytes: Array[Byte]) = if (useGzip) Gzip.encode(ByteString(bytes)) else ByteString(bytes)

val upload = pushAPI.uploadFile(id, gzipEncoded(content).toArray, digest)
// consider async processing here - a few requests in parallel could improve throughput
Await.result(upload, Duration.Inf)
// TODO: reportProgress after each file

}
}
Await.result(wait, Duration.Inf)
reportProgress(0)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ trait PushRestAPI {
@PUT
def uploadFile(@Query id: String, content: Array[Byte], digest: String): Future[Unit]

// check which files are still pending (offered but not uploaded)
/**
check which files are still pending (offered but not uploaded) or done
Note: file is reported as "done" only once
*/
@GET
def expected: Future[Seq[String]]
def expected: Future[(Seq[String], Seq[String])]
}

object PushRestAPI extends RestApiCompanion[EnhancedRestImplicits,PushRestAPI](EnhancedRestImplicits)

0 comments on commit 9426ed5

Please sign in to comment.