From 9426ed55d07129bd40282df270bdfb618293fcc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20=C5=A0pan=C4=9Bl?= Date: Thu, 24 Oct 2019 10:26:49 +0200 Subject: [PATCH] Add list of uploaded files, fix uploading issues. --- .../github/opengrabeso/mixtio/Storage.scala | 22 +++++++++----- .../mixtio/rest/PushRestAPIServer.scala | 30 ++++++++++++------- build.sbt | 12 ++++++-- .../frontend/views/push/PageModel.scala | 2 +- .../mixtio/frontend/views/push/PageView.scala | 30 +++++++++++-------- .../frontend/views/push/PageViewFactory.scala | 5 ++-- .../com/github/opengrabeso/mixtio/Start.scala | 29 +++++++++++------- .../opengrabeso/mixtio/rest/PushRestAPI.scala | 7 +++-- 8 files changed, 88 insertions(+), 49 deletions(-) diff --git a/backend/src/main/scala/com/github/opengrabeso/mixtio/Storage.scala b/backend/src/main/scala/com/github/opengrabeso/mixtio/Storage.scala index cd2b7723..d25e9b5d 100644 --- a/backend/src/main/scala/com/github/opengrabeso/mixtio/Storage.scala +++ b/backend/src/main/scala/com/github/opengrabeso/mixtio/Storage.scala @@ -237,7 +237,7 @@ object Storage extends FileStore { } } - def digest(namespace: String, userId: String, path: String): Option[String] = { + def metadata(namespace: String, userId: String, path: String): Seq[(String, String)] = { val prefix = userFilename(namespace, path, userId) val blobs = storage.list(bucket, BlobListOption.prefix(prefix.name)) val found = blobs.iterateAll().asScala @@ -245,19 +245,26 @@ object Storage extends FileStore { // there should be at most one result found.toSeq.flatMap { i => assert(i.getName.startsWith(prefix.name)) - val name = i.getName.drop(prefix.name.length) val m = try { val md = storage.get(bucket, i.getName, BlobGetOption.fields(BlobField.METADATA)) - val userData = md.getMetadata.asScala - userData.get("digest") + md.getMetadata.asScala.toSeq } catch { case e: Exception => e.printStackTrace() - None + Nil } m //println(s"enum '$name' - '$userId': md '$m'") - }.headOption + } + } + + def metadataValue(namespace: String, userId: String, path: String, name: String): Option[String] = { + val md = metadata(namespace, userId, path) + md.find(_._1 == name).map(_._2) + } + + def digest(namespace: String, userId: String, path: String): Option[String] = { + metadataValue(namespace, userId, path, "digest") } // return true when the digest is matching (i.e. file does not need to be updated) @@ -266,11 +273,10 @@ object Storage extends FileStore { oldDigest.contains(digestToCompare) } - // not used, consider removing (not well tested on GCS) def updateMetadata(file: String, metadata: Seq[(String, String)]): Boolean = { val blobId = fileId(file) val md = storage.get(blobId, BlobGetOption.fields(BlobField.METADATA)) - val userData = md.getMetadata.asScala + val userData = Option(md.getMetadata).getOrElse(new java.util.HashMap[String, String]).asScala val matching = metadata.forall { case (key, name) => userData.get(key).contains(name) } diff --git a/backend/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPIServer.scala b/backend/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPIServer.scala index 45ef7734..d2b6d108 100644 --- a/backend/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPIServer.scala +++ b/backend/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPIServer.scala @@ -32,12 +32,13 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon Storage.check(Main.namespace.stage, userId, file, digest) }.map(_._1) + val pushNamespace = Main.namespace.pushProgress(session) for (f <- needed) { // TODO: store some file information (size, ...) - Storage.store(Main.namespace.pushProgress(session), f, userId, Anything, Anything) + Storage.store(pushNamespace, f, userId, Anything, Anything, metadata = Seq("status" -> "offered")) } // write started tag once the pending files information is complete, to mark it can be scanned now - Storage.store(Main.namespace.pushProgress(session), markerFileName, userId, Anything, Anything) + Storage.store(pushNamespace, markerFileName, userId, Anything, Anything) println(s"offered $needed for $userId") needed } @@ -48,11 +49,11 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon val stream = new ByteArrayInputStream(content) val decompressed = decompressStream(stream) - requests.Upload.storeFromStreamWithDigest(userId, id, localTimeZone, decompressed, digest) - val pushNamespace = Main.namespace.pushProgress(session) - if (true) { // disable for debugging - prevent any upload to debug the upload progress view - Storage.delete(Storage.FullName(pushNamespace, id, userId)) + if (true) { // disable for debugging + requests.Upload.storeFromStreamWithDigest(userId, id, localTimeZone, decompressed, digest) } + val pushNamespace = Main.namespace.pushProgress(session) + Storage.updateMetadata(Storage.FullName(pushNamespace, id, userId).name, metadata = Seq("status" -> "pushed")) println(s"pushed $id for $userId") } @@ -61,17 +62,26 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon val sessionPushProgress = Storage.enumerate(pushNamespace, userId) val started = sessionPushProgress.exists(_._2 == markerFileName) if (!started) { - Seq("") // special response - not empty, but not the list of the files yes + (Seq(""), Nil) // special response - not empty, but not the list of the files yes } else { - val pending = for { + val pendingOrDone = for { (_, f) <- sessionPushProgress if f != markerFileName - } yield f + } yield { + // check metadata, if done, we can delete the file once reported + if (Storage.metadataValue(pushNamespace, userId, f, "status").contains("pushed")) { + Storage.delete(Storage.FullName(pushNamespace, f, userId)) + f -> true + } else { + f -> false + } + } + val (pending, done) = pendingOrDone.toSeq.partition(_._2) if (pending.isEmpty) { // once we return empty response, we can delete the "started" marker file Storage.delete(Storage.FullName(pushNamespace, markerFileName, userId)) } - pending.toSeq + (pending.map(_._1), done.map(_._1)) } } diff --git a/build.sbt b/build.sbt index b1857ac7..e17eb72e 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType} lazy val commonSettings = Seq( organization := "com.github.ondrejspanel", - version := "0.1.10-beta", + version := "0.1.11-beta", scalaVersion := "2.12.10", scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") ) @@ -20,7 +20,7 @@ lazy val jvmLibs = Seq( "org.scalatest" %% "scalatest" % "3.0.8" % "test", "io.udash" %% "udash-core" % udashVersion, - "io.udash" %% "udash-rest" % udashVersion excludeAll ExclusionRule(organization = "io.netty"), + "io.udash" %% "udash-rest" % udashVersion, "io.udash" %% "udash-rpc" % udashVersion, "io.udash" %% "udash-css" % udashVersion, ) @@ -84,7 +84,13 @@ lazy val pushUploader = (project in file("push-uploader")) name := "MixtioStart", commonSettings, libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.9", - libraryDependencies ++= commonLibs ++ jvmLibs + libraryDependencies ++= commonLibs ++ jvmLibs, + assemblyMergeStrategy in assembly := { + case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) + } ) def inDevMode = true || sys.props.get("dev.mode").exists(value => value.equalsIgnoreCase("true")) diff --git a/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageModel.scala b/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageModel.scala index df19496c..835f0247 100644 --- a/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageModel.scala +++ b/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageModel.scala @@ -5,6 +5,6 @@ package push import io.udash._ -case class PageModel(s: settings_base.SettingsModel, pending: Seq[String]) +case class PageModel(s: settings_base.SettingsModel, pending: Seq[String] = Seq(""), done: Seq[String] = Seq("")) object PageModel extends HasModelPropertyCreator[PageModel] diff --git a/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageView.scala b/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageView.scala index 0ac53f5f..6cddc9ef 100644 --- a/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageView.scala +++ b/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageView.scala @@ -23,22 +23,15 @@ class PageView( def getTemplate: Modifier = { - div( - ss.flexContainer, - div( - ss.container, - ss.flexItem, - template(model.subModel(_.s), presenter), - showIf(model.subSeq(_.pending).transform(_.isEmpty))(submitButton.render) - ), - produce(model.subSeq(_.pending)) { pending => - if (pending.nonEmpty) { + def produceList(seq: ReadableSeqProperty[String], title: String) = { + produce(seq) { file => + if (file.nonEmpty) { div( ss.container, ss.flexItem, table( - tr(th(h2("Uploading:"))), - pending.map(file => + tr(th(h2(title))), + file.map(file => tr(td(niceFileName(file))) ) ) @@ -47,6 +40,19 @@ class PageView( div().render } } + } + + div( + ss.flexContainer, + div( + ss.container, + ss.flexItem, + template(model.subModel(_.s), presenter), + showIf(model.subSeq(_.pending).transform(_.isEmpty))(submitButton.render) + ), + + produceList(model.subSeq(_.pending), "Uploading:"), + produceList(model.subSeq(_.done), "Uploaded:") ) } } \ No newline at end of file diff --git a/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageViewFactory.scala b/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageViewFactory.scala index e4c3294d..781ccab9 100644 --- a/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageViewFactory.scala +++ b/frontend/src/main/scala/com/github/opengrabeso/mixtio/frontend/views/push/PageViewFactory.scala @@ -16,9 +16,10 @@ class PageViewFactory( import scala.concurrent.ExecutionContext.Implicits.global private def updatePending(model: ModelProperty[PageModel]): Unit = { - for (pending <- userService.api.get.push(sessionId, "").expected) { + for ((pending, done) <- userService.api.get.push(sessionId, "").expected) { if (pending != Seq("")) { model.subProp(_.pending).set(pending) + model.subProp(_.done).set(model.subProp(_.done).get ++ done) } if (pending.nonEmpty) { dom.window.setTimeout(() => updatePending(model), 1000) // TODO: once long-poll is implemented, reduce or remove the delay @@ -27,7 +28,7 @@ class PageViewFactory( } override def create(): (View, Presenter[PushPageState]) = { - val model = ModelProperty(PageModel(settings_base.SettingsModel(), Seq(""))) // start with non-empty placeholder until real state is confirmed + val model = ModelProperty(PageModel(settings_base.SettingsModel())) // start with non-empty placeholder until real state is confirmed loadSettings(model.subModel(_.s), userService) diff --git a/push-uploader/src/main/scala/com/github/opengrabeso/mixtio/Start.scala b/push-uploader/src/main/scala/com/github/opengrabeso/mixtio/Start.scala index 20fec600..14a63393 100644 --- a/push-uploader/src/main/scala/com/github/opengrabeso/mixtio/Start.scala +++ b/push-uploader/src/main/scala/com/github/opengrabeso/mixtio/Start.scala @@ -405,26 +405,33 @@ object Start extends App { } val userAPI = api.userAPI(userId, authCode) + + if (true) { + val name = Await.result(userAPI.name, Duration.Inf) + println(s"Uploading as $name") + } + val pushAPI = userAPI.push(sessionId, localTimeZone) val fileContent = filesToSend.map(f => f._1 -> (f._2, f._3)).toMap val toOffer = filesToSend.map(f => f._1 -> f._2) - for { - needed <- pushAPI.offerFiles(toOffer) - _ = reportProgress(needed.size) - id <- needed - } { - val (digest, content) = fileContent(id) - def gzipEncoded(bytes: Array[Byte]) = if (useGzip) Gzip.encode(ByteString(bytes)) else ByteString(bytes) + val wait = pushAPI.offerFiles(toOffer).map { needed => + reportProgress(needed.size) - val upload = pushAPI.uploadFile(id, gzipEncoded(content).toArray, digest) - // consider async processing here - a few requests in parallel could improve throughput - Await.result(upload, Duration.Inf) - // TODO: reportProgress after each file + needed.foreach { id => + val (digest, content) = fileContent(id) + def gzipEncoded(bytes: Array[Byte]) = if (useGzip) Gzip.encode(ByteString(bytes)) else ByteString(bytes) + val upload = pushAPI.uploadFile(id, gzipEncoded(content).toArray, digest) + // consider async processing here - a few requests in parallel could improve throughput + Await.result(upload, Duration.Inf) + // TODO: reportProgress after each file + + } } + Await.result(wait, Duration.Inf) reportProgress(0) diff --git a/shared-js/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPI.scala b/shared-js/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPI.scala index a90bb019..423988da 100644 --- a/shared-js/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPI.scala +++ b/shared-js/src/main/scala/com/github/opengrabeso/mixtio/rest/PushRestAPI.scala @@ -17,9 +17,12 @@ trait PushRestAPI { @PUT def uploadFile(@Query id: String, content: Array[Byte], digest: String): Future[Unit] - // check which files are still pending (offered but not uploaded) + /** + check which files are still pending (offered but not uploaded) or done + Note: file is reported as "done" only once + */ @GET - def expected: Future[Seq[String]] + def expected: Future[(Seq[String], Seq[String])] } object PushRestAPI extends RestApiCompanion[EnhancedRestImplicits,PushRestAPI](EnhancedRestImplicits)