Skip to content

Commit

Permalink
Client / server protocol version handling and reporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
OndrejSpanel committed Feb 12, 2020
1 parent 006e237 commit 5d1c36c
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ object PushRestAPIServer {
// we want the file to exist, but we do not care for the content (and we are never reading it)
@SerialVersionUID(10L)
case object Anything

@SerialVersionUID(10L)
case class TextFile(text: String)
}

class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZone: String) extends PushRestAPI with RestAPIUtils {
Expand All @@ -38,7 +41,7 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon
Storage.store(pushNamespace, f, userId, Anything, Anything, metadata = Seq("status" -> "offered"))
}
// write started tag once the pending files information is complete, to mark it can be scanned now
Storage.store(pushNamespace, markerFileName, userId, Anything, Anything)
Storage.store(pushNamespace, markerFileName, userId, TextFile(""), TextFile(""))
println(s"offered $needed for $userId")
needed
}
Expand All @@ -57,12 +60,19 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon
println(s"pushed $id for $userId")
}

def reportError(error: String) = syncResponse {
val pushNamespace = Main.namespace.pushProgress(session)
// write the marker, write an error into it
println(s"Reported error $error")
Storage.store(pushNamespace, markerFileName, userId, TextFile(error), TextFile(error))
}

def expected = syncResponse {
val pushNamespace = Main.namespace.pushProgress(session)
val sessionPushProgress = Storage.enumerate(pushNamespace, userId)
val started = sessionPushProgress.exists(_._2 == markerFileName)
if (!started) {
(Seq(""), Nil) // special response - not empty, but not the list of the files yes
(Seq(""), Nil, None) // special response - not empty, but not the list of the files yes
} else {
val pendingOrDone = for {
(_, f) <- sessionPushProgress
Expand All @@ -78,10 +88,17 @@ class PushRestAPIServer(parent: UserRestAPIServer, session: String, localTimeZon
}
val (pending, done) = pendingOrDone.toSeq.partition(_._2)
if (pending.isEmpty) {
val markerContent = Storage.load[TextFile](Storage.FullName(pushNamespace, markerFileName, userId))
// once we return empty response, we can delete the "started" marker file
println("push finished")
for (error <- markerContent) {
println(s" Error ${error.text}")
}
Storage.delete(Storage.FullName(pushNamespace, markerFileName, userId))
(pending.map(_._1), done.map(_._1), markerContent.map(_.text))
} else {
(pending.map(_._1), done.map(_._1), None)
}
(pending.map(_._1), done.map(_._1))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,23 @@ object RestAPIServer extends RestAPI with RestAPIUtils {
auth
}

def limitedSession(userId: String, authCode: String) = syncResponse {
private def createUploadSession(userId: String, authCode: String) = {
val sessionId = "limited-session-" + System.currentTimeMillis().toString
val auth = StravaAuthResult(authCode, "", "", 0, "", userId, "", sessionId)
createUser(auth)
auth.sessionId

}
def uploadSession(userId: String, authCode: String, version: String) = syncResponse {
if (version != RestAPI.apiVersion) {
throw HttpErrorException(403, s"API version required: $version, client API version ${RestAPI.apiVersion} ")
} else {
createUploadSession(userId, authCode)
}
}

def reportUploadSessionError(userId: String, authCode: String) = syncResponse {
createUploadSession(userId, authCode)
}

def userAPI(userId: String, authCode: String, session: String): UserRestAPI = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ package push

import io.udash._

case class PageModel(s: settings_base.SettingsModel, pending: Seq[String] = Seq(""), done: Seq[String] = Seq(""))
case class PageModel(s: settings_base.SettingsModel, pending: Seq[String] = Seq(""), done: Seq[String] = Seq(""), result: String = "")

object PageModel extends HasModelPropertyCreator[PageModel]
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class PageView(
ss.container,
ss.flexItem,
template(model.subModel(_.s), presenter),
showIf(model.subSeq(_.pending).transform(_.isEmpty))(submitButton.render)
showIf(model.subSeq(_.pending).transform(_.isEmpty))(submitButton.render),
div(h3(bind(model.subProp(_.result)))).render
),

produceList(model.subSeq(_.pending), "Uploading:"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ class PageViewFactory(
import scala.concurrent.ExecutionContext.Implicits.global

private def updatePending(model: ModelProperty[PageModel]): Unit = {
for ((pending, done) <- userService.api.get.push(sessionId, "").expected) {
for ((pending, done, result) <- userService.api.get.push(sessionId, "").expected) {
if (pending != Seq("")) {
model.subProp(_.pending).set(pending)
model.subProp(_.done).set(model.subProp(_.done).get ++ done)
}
if (pending.nonEmpty) {
dom.window.setTimeout(() => updatePending(model), 1000) // TODO: once long-poll is implemented, reduce or remove the delay
}
result.foreach { r =>
model.subProp(_.result).set(r)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import akka.util.ByteString

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise, duration}
import scala.util.{Success, Try}
import scala.util.{Failure, Success, Try}
import scala.xml.Elem
import java.time.{ZoneId, ZonedDateTime}

import com.github.opengrabeso.mixtio.rest.RestAPI
import com.softwaremill.sttp.SttpBackend
import common.Util._
import io.udash.rest.SttpRestClient
import io.udash.rest.raw.HttpErrorException
import shared.Digest

import scala.util.control.NonFatal
Expand Down Expand Up @@ -404,33 +405,56 @@ object Start extends App {
(f, digest, fileBytes)
}

val createSession = api.limitedSession(userId, authCode)
val pushSessionId = Await.result(createSession, Duration.Inf)
val createSession = api.uploadSession(userId, authCode, RestAPI.apiVersion)
Try {
Await.result(createSession, Duration.Inf)
} match {
case Success(pushSessionId) =>

val userAPI = api.userAPI(userId, authCode, pushSessionId)
val userAPI = api.userAPI(userId, authCode, pushSessionId)

val pushAPI = userAPI.push(sessionId, localTimeZone)
val pushAPI = userAPI.push(sessionId, localTimeZone)

val fileContent = filesToSend.map(f => f._1 -> (f._2, f._3)).toMap
val fileContent = filesToSend.map(f => f._1 -> (f._2, f._3)).toMap

val toOffer = filesToSend.map(f => f._1 -> f._2)
val toOffer = filesToSend.map(f => f._1 -> f._2)

val wait = pushAPI.offerFiles(toOffer).map { needed =>
reportProgress(needed.size)
val wait = pushAPI.offerFiles(toOffer).map { needed =>
reportProgress(needed.size)

needed.foreach { id =>
val (digest, content) = fileContent(id)
def gzipEncoded(bytes: Array[Byte]) = if (useGzip) Gzip.encode(ByteString(bytes)) else ByteString(bytes)
needed.foreach { id =>
val (digest, content) = fileContent(id)
def gzipEncoded(bytes: Array[Byte]) = if (useGzip) Gzip.encode(ByteString(bytes)) else ByteString(bytes)

val upload = pushAPI.uploadFile(id, gzipEncoded(content).toArray, digest)
// consider async processing here - a few requests in parallel could improve throughput
Await.result(upload, Duration.Inf)
// TODO: reportProgress after each file

}
val upload = pushAPI.uploadFile(id, gzipEncoded(content).toArray, digest)
// consider async processing here - a few requests in parallel could improve throughput
Await.result(upload, Duration.Inf)
}
}
Await.result(wait, Duration.Inf)
reportProgress(0)
case Failure(exception) =>
println(s"Unable to connect to the server upload session, error $exception")
// event if connecting to the session has failed, try to established a session to report the error
// such session has no version requirements, therefore it should always succeed
val reportError = api.reportUploadSessionError(userId, authCode)
val pushSessionId = Await.result(reportError, Duration.Inf)


val userAPI = api.userAPI(userId, authCode, pushSessionId)
val errorString = exception match {
case http: HttpErrorException =>
if (http.payload.isEmpty) {
s"HTTP Error ${http.code}"
} else {
s"HTTP Error ${http.code}: ${http.payload.get}"
}
case ex =>
ex.toString
}
val wait = userAPI.push(sessionId, localTimeZone).reportError(errorString)
Await.result(wait, Duration.Inf)
}
Await.result(wait, Duration.Inf)
reportProgress(0)


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.udash.rest._
import scala.concurrent.Future

trait PushRestAPI {

/**
* offer files to the server, server will respond which files need to be sent
* for each file send file id + file digest
Expand All @@ -22,7 +23,10 @@ trait PushRestAPI {
Note: file is reported as "done" only once
*/
@GET
def expected: Future[(Seq[String], Seq[String])]
def expected: Future[(Seq[String], Seq[String], Option[String])]

@PUT
def reportError(toString: String): Future[Unit]
}

object PushRestAPI extends RestApiCompanion[EnhancedRestImplicits,PushRestAPI](EnhancedRestImplicits)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.udash.rest._
import scala.concurrent.Future

trait RestAPI {

@GET
def identity(@Path in: String): Future[String]

Expand All @@ -20,7 +21,12 @@ trait RestAPI {
def now: Future[ZonedDateTime]

// create a limited session (no Strava access) - used for push uploader
def limitedSession(userId: String, authCode: String): Future[String]
def uploadSession(userId: String, authCode: String, version: String): Future[String]

// create even more limited session - used for push uploader error reporting
def reportUploadSessionError(userId: String, authCode: String): Future[String]
}

object RestAPI extends RestApiCompanion[EnhancedRestImplicits,RestAPI](EnhancedRestImplicits)
object RestAPI extends RestApiCompanion[EnhancedRestImplicits,RestAPI](EnhancedRestImplicits) {
final val apiVersion = "1.0"
}

0 comments on commit 5d1c36c

Please sign in to comment.