diff --git a/build.gradle b/build.gradle index 24cde6e..a9e5220 100644 --- a/build.gradle +++ b/build.gradle @@ -1,19 +1,67 @@ plugins { id 'scala' + id 'idea' id "com.github.maiflai.scalatest" version "0.24" } repositories { mavenCentral() + jcenter() +} +configurations { + scalaCompilerPlugin } dependencies { implementation 'org.scala-lang:scala-library:2.12.8' + implementation 'org.scala-lang.modules:scala-async_2.12:0.9.7' + compile 'com.monovore:decline_2.12:0.6.1' + compile 'org.typelevel:cats-core_2.12:1.5.0' + compile group: 'com.criteo.lolhttp', name: 'lolhttp_2.12', version: '10.0.0' + compile group: 'com.criteo.lolhttp', name: 'lolhtml_2.12', version: '10.0.0' + compile group: 'com.criteo.lolhttp', name: 'loljson_2.12', version: '10.0.0' + compile group: 'io.circe', name: 'circe-core_2.12', version: '0.11.1' + compile group: 'io.circe', name: 'circe-generic_2.12', version: '0.11.1' + compile group: 'io.circe', name: 'circe-optics_2.12', version: '0.11.0' + compile group: 'com.nulab-inc', name: 'scala-oauth2-core_2.12', version: '1.3.0' testImplementation 'org.pegdown:pegdown:1.4.2' testImplementation 'org.scalatest:scalatest_2.12:3.0.6' testImplementation 'junit:junit:4.12' + + scalaCompilerPlugin "org.scalamacros:paradise_2.12.8:2.1.0" +} + +tasks.withType(ScalaCompile) { + scalaCompileOptions.additionalParameters = [ + "-Xplugin:" + configurations.scalaCompilerPlugin.asPath + ] } task wrapper(type: Wrapper) { gradleVersion = '4.10' -} \ No newline at end of file +} + +idea{ + module { + testSourceDirs += sourceSets.main.runtimeClasspath + } +} + +task run(type: JavaExec, dependsOn: classes) { + main = 'chatapp.ChatClient' + standardInput = System.in + classpath sourceSets.main.runtimeClasspath +} + +compileJava { + options.compilerArgs += [ + "--add-modules", "java.se", + "--add-exports", "java.base/jdk.internal.ref=ALL-UNNAMED", + "--add-opens", "java.base/java.lang=ALL-UNNAMED", + "--add-opens", "java.base/java.nio=ALL-UNNAMED", + "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens", "java.management/sun.management=ALL-UNNAMED", + "--add-opens", "jdk.management/com.sun.management.internal=ALL-UNNAMED" + ] +} + diff --git a/src/main/scala/chatapp/CLI.scala b/src/main/scala/chatapp/CLI.scala new file mode 100644 index 0000000..73724ee --- /dev/null +++ b/src/main/scala/chatapp/CLI.scala @@ -0,0 +1,25 @@ +package chatapp + +import cats.implicits._ +import com.monovore.decline._ + +class ChatCLI { + +} + +// TODO: Add client and server subcommands +object ChatCLI extends CommandApp( + name = "rover-chat", + header = "Says hello!", + main = { + val userOpt = + Opts.option[String]("target", help = "Person to greet.") .withDefault("world") + + val quietOpt = Opts.flag("quiet", help = "Whether to be quiet.").orFalse + + (userOpt, quietOpt).mapN { (user, quiet) => + if (quiet) println("...") + else println(s"Hello $user!") + } + } +) diff --git a/src/main/scala/chatapp/Chat.scala b/src/main/scala/chatapp/Chat.scala index f9cec1c..c400834 100644 --- a/src/main/scala/chatapp/Chat.scala +++ b/src/main/scala/chatapp/Chat.scala @@ -3,67 +3,40 @@ package chatapp import rover.rdo.AtomicObjectState import rover.rdo.client.RdObject -// FIXME: ensure messages can be read, but not modified or reassigned... -// FIXME: after state & rd object impl change -class Chat(var messages: List[ChatMessage]) extends RdObject[String](AtomicObjectState.initial("")) { +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ - // TODO: Something with users, crypto stuff on identity - // FIXME: Hashes should be used here as well as user ids - private var users = Map[Long, String]() - - def this(messages: List[ChatMessage], users: Map[Long, String]) { - this(messages) - this.users = users - } - def addUser(id: Long, uri: String): Unit = { - if (!this.users.contains(id)) this.users updated (id, uri) - // FIXME: otherwise we can also update, a user's uri - else println("Already existing user") - } +// FIXME: ensure messages can be read, but not modified or reassigned... +// FIXME: after state & rd object impl change +class Chat(_onStateModified: Chat#Updater, _initialState: AtomicObjectState[List[ChatMessage]]) extends RdObject[List[ChatMessage]](_initialState) { +// type State = List[ChatMessage] + type Updater = AtomicObjectState[List[ChatMessage]] => Future[Unit] - def removeUser(id : Long)= { - if(users.contains(id)) users = users - id - else println("Non-exsiting id given") - } + def send(message: ChatMessage): Future[Unit]= { + val op: AtomicObjectState[List[ChatMessage]]#Op = s => s :+ message - def appendMessage(chatMessage: ChatMessage): Unit = { - messages = messages :+ chatMessage + async { + modifyState(op) + } } - def terminate(): Unit ={ - //TODO: terminate + override def onStateModified(oldState: AtomicObjectState[List[ChatMessage]]): Future[Unit] = { + _onStateModified(state) } def currentVersion(): Long = { - messages.size + immutableState.size } -// override def stableVersion: Long = { -// // TODO: the stable version (last committed) -// messages.size -// } +} - def printMessages(): Unit = { - for (i <- messages) { - println(s"body: ${i.body}, author: ${i.author}, time: ${i.timestamp}") - } +object Chat { + def fromRDO(rdo: RdObject[List[ChatMessage]], _onStateModified: Chat#Updater): Chat = { + new Chat(_onStateModified, rdo.state) } } -class ChatMessage(val body: String, - val author: String, - val timestamp: Long = java.time.Instant.now.getEpochSecond()) -{ -} - -object chatFoo{ - def main(args: Array[String]): Unit ={ - val chat = new Chat(messages = List[ChatMessage]()) - chat.addUser(1234, "tt") - val m: ChatMessage = new ChatMessage("test message", "Ioa") - chat.appendMessage(m) - chat.printMessages() - } -} diff --git a/src/main/scala/chatapp/Client.scala b/src/main/scala/chatapp/Client.scala new file mode 100644 index 0000000..cf67615 --- /dev/null +++ b/src/main/scala/chatapp/Client.scala @@ -0,0 +1,140 @@ +package chatapp + +import rover.Client.OAuth2Credentials +import rover.rdo.AtomicObjectState +import rover.rdo.client.RdObject +import rover.{Client, HTTPClient, Session} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} +import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.duration._ + +class ChatClient(serverAddress: String) extends + HTTPClient[List[ChatMessage]](serverAddress, (credentials) => credentials.accessToken) { +// Client.OAuth2Client[List[ChatMessage]](serverAddress, (credentials: OAuth2Credentials) => credentials.accessToken, Map[String, AtomicObjectState[List[ChatMessage]]]()) { + var session: Session[OAuth2Credentials, List[ChatMessage]] = null + var user: ChatUser = null + var chat: Chat = null; + + val printer = (string: String) => { + val cls = s"${string.split("\n").map(c => s"${REPL.UP}${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_LINE_AFTER}").mkString("")}" + // Prepend two spaces to match input indentation of "> " + val text = string.split("\n").map(line => s" $line").mkString("\n") + + s"${REPL.SAVE_CURSOR}$cls\r$text${REPL.RESTORE_CURSOR}" + } + + // TODO: This is hacky, figure out a better way to do this + val updater: Chat#Updater = state => async { + val text = state.immutableState.takeRight(ChatClient.SIZE).map(m => m.toString()).mkString("\n") + print(s"${printer(text)}") + } + + def login(user: ChatUser): Future[Unit] = { + async { + val credentials = new OAuth2Credentials("fake credentials", "fake credentials") + this.user = user + session = createSession(credentials) + val state = importRDO("chat") + val rdo = new RdObject[List[ChatMessage]](state) + chat = Chat.fromRDO(rdo, updater) +// println(s"Initial state: ${chat.state}") + } + } + + def send(message: String): Future[Unit] = { +// println(s"Sending message with intial state: ${chat.state}") + async { + await(chat.send(new ChatMessage(message, user))) + exportRDO("chat", chat.state) + // await(session.exportRDO("chat", chat)) +// updater(chat.state) + } + } + + def updateLoop(): Future[Unit] = { + async { + while(true) { +// val serverState = await(importRDO("chat")).asInstanceOf[RdObject[List[ChatMessage]]] +// chat = Chat.fromRDO(serverState, updater) +// println(s"Updating state from update loop...") + val state = importRDO("chat") +// println(s"Got updated state: $state") + // chat = Chat.fromRDO(rdo, updater) + + appendedState("chat", state) + chat.state = state + +// println(s"Rendering chat state: ${chat.state}") + updater(chat.state) // Force re-render + Thread.sleep(ChatClient.UPDATE_DELAY_MS) + } + } + } + + def render(): Future[Unit] = { +// val chat = new Chat(updater) + println(s" Welcome to Rover Chat! Connected to: $serverAddress") + print((1 to ChatClient.SIZE).map(i => "\n").mkString("")) + + val reader = () => { + print("> ") + val s = scala.io.StdIn.readLine() + s + } + val executor = (input: String) => { + async { + val p = send(input) + await(p) + // This clears the input line + print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}") + chat.state.immutableState.takeRight(ChatClient.SIZE).map(m => s"${m.toString()}").mkString("\n") + } + } + val repl: REPL[String] = new REPL(reader, executor, printer) +// Await.result(repl.loop(), Duration.Inf) + updateLoop() // TODO: Memory leak + repl.loop() + } + +} + +object ChatClient { + val SIZE = 10 + val UPDATE_DELAY_MS = 2000 + + def main(args: Array[String]): Unit = { + val serverAddress = "localhost" + val client = new ChatClient(serverAddress) + val f = async { + await(client.login(ChatUser.Steffan)) + await(client.render()) + } + + Await.result(f, Duration.Inf) + } +} + +object Bot { + def main(args: Array[String]): Unit = { + val serverAddress = "localhost" + val client = new ChatClient(serverAddress) + val f = async { + await(client.login(ChatUser.Giannis)) + client.updateLoop() // TODO: Memory leak + + // Simulate conversation + Thread.sleep(3000) + await(client.send("Hey man!")) + + Thread.sleep(3000) + await(client.send("How's it going?")) + + Thread.sleep(10000) + await(client.send("Yea man I'm good")) + } + + Await.result(f, Duration.Inf) + } +} \ No newline at end of file diff --git a/src/main/scala/chatapp/Message.scala b/src/main/scala/chatapp/Message.scala new file mode 100644 index 0000000..661f6b7 --- /dev/null +++ b/src/main/scala/chatapp/Message.scala @@ -0,0 +1,38 @@ +package chatapp +import io.circe._, io.circe.syntax._ +import io.circe.{Encoder, Json} + + +class ChatMessage(val body: String, + val author: ChatUser, + val timestamp: Long = java.time.Instant.now.getEpochSecond()) { + + // def encoded: Json = Encoder.forProduct2("author", "body")(m => m.body, m.author.username) + + override def toString: String = { + s"${author.username}: $body" + } +} + +object ChatMessage { + implicit val encodeChatMessage: Encoder[ChatMessage] = new Encoder[ChatMessage] { + final def apply(m: ChatMessage): Json = Json.obj( + ("author", m.author.asJson), +// ("author", Json.fromString(m.author.username)), + ("body", Json.fromString(m.body)), + ("timestamp", Json.fromLong(m.timestamp)) + ) + } + + implicit val decodeAtomicObjectState: Decoder[ChatMessage] = new Decoder[ChatMessage] { + final def apply(c: HCursor): Decoder.Result[ChatMessage] = + for { + author <- c.downField("author").as[ChatUser] + body <- c.downField("body").as[String] + timestamp <- c.downField("timestamp").as[Long] + } yield { + new ChatMessage(body, author) + } + } + +} diff --git a/src/main/scala/chatapp/REPL.scala b/src/main/scala/chatapp/REPL.scala new file mode 100644 index 0000000..649c432 --- /dev/null +++ b/src/main/scala/chatapp/REPL.scala @@ -0,0 +1,66 @@ +package chatapp + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Await, Future, Promise} +import scala.async.Async.{async, await} + + +class REPL[A](reader: REPL[A]#Reader, executor: REPL[A]#Executor, printer: REPL[A]#Printer) { + type Reader = () => String + type Executor = String => Future[A] + type Printer = A => String + + def loop(): Future[Unit] = { + var looping = true + async { + while(looping) { + val input = reader() + + if (input == null || input == "/exit") { + looping = false + } else if(input == "") { + print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}\r") + } else { + val result = await(executor(input)) + val output = printer(result) + + print(output) + } + } + } + } + +} + +object REPL { + val ERASE_SCREEN_AFTER="\u001b[0J" + val ERASE_LINE_BEFORE="\u001b[1K" + val ERASE_LINE_AFTER="\u001b[0K" + + val HOME="\u001b[1H" + val UP="\u001b[1A" + val DOWN="\u001b[1B" + val FORWARD="\u001b[1C" + val BACKWARD="\u001b[1D" + + val SAVE_CURSOR="\u001b[0s" + val RESTORE_CURSOR="\u001b[0u" + + def main(args: Array[String]): Unit = { + val reader = () => { + print(s"${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_SCREEN_AFTER}\r> ") + scala.io.StdIn.readLine() + } + val executor = (input: String) => { + async { input } + } + val printer = (string: String) => { + val cls = s"${string.split("\n").map(c => UP).mkString("")}$ERASE_LINE_BEFORE$ERASE_SCREEN_AFTER\r" + // Prepend two spaces to match input indentation of "> " + val text = string.split("\n").map(line => s" $line").mkString("\n") + s"$cls$text\n" + } + val repl = new REPL(reader, executor, printer) + repl.loop() + } +} diff --git a/src/main/scala/chatapp/Server.scala b/src/main/scala/chatapp/Server.scala new file mode 100644 index 0000000..7b0d7ff --- /dev/null +++ b/src/main/scala/chatapp/Server.scala @@ -0,0 +1,19 @@ +package chatapp + +import rover.Client.OAuth2Credentials +import rover.rdo.AtomicObjectState +import rover.{Client, Server, Session, HTTPServer} + + +class ChatServer extends HTTPServer[List[ChatMessage]](_mapToStates = Map("chat" -> ChatServer.CHAT_STATE)) { + +} + +object ChatServer { + val CHAT_STATE = AtomicObjectState.initial(List[ChatMessage](new ChatMessage("test", ChatUser.Steffan))) + + def main(args: Array[String]): Unit = { + val server = new ChatServer + server.start() + } +} diff --git a/src/main/scala/chatapp/User.scala b/src/main/scala/chatapp/User.scala new file mode 100644 index 0000000..f5d34be --- /dev/null +++ b/src/main/scala/chatapp/User.scala @@ -0,0 +1,29 @@ +package chatapp + + +import io.circe._, io.circe.syntax._ +import io.circe.{Encoder, Json} + +class ChatUser(val username: String) { +} + +object ChatUser { + val Steffan = new ChatUser("steffan") + val Giannis = new ChatUser("giannis") + + implicit val encodeUser: Encoder[ChatUser] = new Encoder[ChatUser] { + final def apply(u: ChatUser): Json = Json.obj( + ("username", Json.fromString(u.username)) + ) + } + + implicit val decodeAtomicObjectState: Decoder[ChatUser] = new Decoder[ChatUser] { + final def apply(c: HCursor): Decoder.Result[ChatUser] = + for { + username <- c.downField("username").as[String] + } yield { + new ChatUser(username) + } + } + +} \ No newline at end of file diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala new file mode 100644 index 0000000..9f1d7bf --- /dev/null +++ b/src/main/scala/rover/Client.scala @@ -0,0 +1,120 @@ +package rover + +import lol.http._ +import lol.json._ + +import rover.Client.OAuth2Credentials +import rover.rdo.AtomicObjectState + +import cats.implicits._ +import cats.effect.{ IO } + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} + +import io.circe._ + +/** + * Class encapsulating a Client, who interacts with a Server within a Session + * for exchanging RDOs (in fact their atomic states). + * @param serverAddress. the address of the corresponding server + * @param identifier, the access token granted to the client for authorized access to server + * @param mapToStates, map to up-to-date version of local RDOs + */ +//FIXME: create a unique, static id for each RDO upon its creation +class Client[C, A](protected val serverAddress: String, protected val identifier: Session[C, A]#Identifier, + protected var mapToStates: Map[String, AtomicObjectState[A]] = Map[String, AtomicObjectState[A]]()){ + + + val server = Server.fromAddress[C,A](serverAddress) + def createSession(credentials: C): Session[C, A] = { + server.createSession(credentials, this.identifier) + } + + def appendedState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ + this.mapToStates = this.mapToStates + (stateId -> atomicState) + } + + def getAtomicStateWithId(stateId: String): AtomicObjectState[A] ={ + return mapToStates(stateId) + } +} + + +object Client { + class OAuth2Credentials(val accessToken: String, val refreshToken: String) {} + type OAuth2Client[A] = Client[OAuth2Credentials, A] + + def oauth2[A](): OAuth2Client[A] = { + null + } +} + +class HTTPClient[A](_serverAddress: String, _identifier: Session[OAuth2Credentials, A]#Identifier)(implicit val encodeA: Encoder[A], implicit val decodeA: Decoder[A]) extends Client[OAuth2Credentials, A](_serverAddress, _identifier) { + + implicit val stateEncoder: Encoder[AtomicObjectState[A]] = Encoder.forProduct1("immutableState")(rdo => (rdo.immutableState)) + // implicit val stateDecoder: Decoder[AtomicObjectState[List[ChatMessage]]] = deriveDecoder[AtomicObjectState[List[ChatMessage]]] +// implicit val rdoEncoder: Encoder[RdObject[A]] = Encoder.forProduct1("state")(rdo => (rdo.state)) + + implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { + final def apply(c: HCursor): Decoder.Result[AtomicObjectState[A]] = + for { + immutableState <- c.downField("immutableState").as[A] + } yield { + AtomicObjectState.initial(immutableState) + } + } + + def importRDO(objectId: String): AtomicObjectState[A] = { + val roverClient = lol.http.Client(serverAddress, 8888, "http") + val userAgent = h"User-Agent" -> h"lolhttp" + + val getState = (for { + result <- roverClient.run(Get(s"/api/rdo/$objectId").addHeaders(userAgent)) { + _.readSuccessAs[Json].map(json => { + json.as[AtomicObjectState[A]] + }) + } + + state <- IO { + result.right.get + } + _ <- roverClient.stop() + + } yield (state)).onError { case _ => roverClient.stop() } + + + getState.unsafeRunSync + + } + + def exportRDO(objectId: String, state: AtomicObjectState[A]): Unit = { + val roverClient = lol.http.Client(serverAddress, 8888, "http") + val userAgent = h"User-Agent" -> h"lolhttp" + +// println(s"Exporting RDO $objectId as $state") + val setState = (for { + result <- roverClient.run(Post(s"/api/rdo/$objectId", stateEncoder.apply(state).toString).addHeaders(userAgent)) { + _.readSuccessAs[Json].map(json => { +// println(s"Received JSON response: $json") +// json.as[AtomicObjectState[A]] + }) + } + +// state <- IO { +// if (result.isLeft) { +// println(s"${result.left.get}") +// } +// result.right.get +// } + _ <- roverClient.stop() + + } yield (state)).onError { case _ => roverClient.stop() } + + + setState.unsafeRunSync + + } + +} \ No newline at end of file diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala new file mode 100644 index 0000000..fa278a8 --- /dev/null +++ b/src/main/scala/rover/Server.scala @@ -0,0 +1,330 @@ +package rover + +import java.security.Timestamp +import java.util.Date + +import rover.Client.OAuth2Credentials +import rover.rdo.AtomicObjectState +import rover.rdo.client.{CommonAncestor, RdObject} +import lol.http._ +import lol.json._ + +import scala.concurrent.ExecutionContext.Implicits.global +import cats.effect.IO +import chatapp.{ChatMessage, ChatUser} +import io.circe._ +import io.circe.syntax._ +import io.circe.generic.semiauto._ +import scalaoauth2.provider.{AuthInfo, AuthorizationRequest, ClientCredential, DataHandler, Password, TokenEndpoint} + +import scala.util.Try +import scala.async.Async.{async, await} +import scala.concurrent.Future + + +/** + * Encapsulating the logic of the server. + * @param address of the server + * @param mapToClients, map to clients using as key the access token granted to the client + * @param mapToStates, map to stable (committed) states of RDOs +*/ +class Server[C, A]( protected val address: String, + protected val mapToClients: Map[Session[C, A]#Identifier, Client[C, A]], protected var mapToStates: Map[String, AtomicObjectState[A]]) { + + //FIXME: Does the server has its own creds? It merely keeps track of clients' creds +// val credentials = null + + // TODO: Determine what to do with this + def clientFromCredentials(credentials: Session[C,A]#Identifier): Client[C, A] = { + new Client[C, A](this.address, credentials, Map[String, AtomicObjectState[A]]()) + } + + def createSession(credentials: C, identifier: Session[C,A]#Identifier): Session[C, A] = { + new Session[C, A](credentials, this, clientFromCredentials(identifier)) + } + + def getAtomicStateWithId(stateId: String): AtomicObjectState[A] = { + return mapToStates(stateId) + } + + def deliveredState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ + this.mapToStates = this.mapToStates + (stateId -> atomicState) + } + + def receivedState(stateId: String, state: AtomicObjectState[A]): Unit ={ + val clientRDO = new RdObject[A](state) + val serverRDO = new RdObject[A](mapToStates(stateId)) + val ancestor = new CommonAncestor[A](serverRDO, clientRDO) + if (ancestor == serverRDO) deliveredState(stateId, state) + else { + //FiXME: Conflict resolution and history diff stuff + } + } +} + +class HTTPServer[A]( + val port: Int = 8888, + _mapToClients: Map[Session[OAuth2Credentials, A]#Identifier, Client[OAuth2Credentials, A]] = Map[Session[OAuth2Credentials, A]#Identifier, Client[OAuth2Credentials, A]](), + _mapToStates: Map[String, AtomicObjectState[A]] = Map[String, AtomicObjectState[A]]() +)(implicit val encodeA: Encoder[A], implicit val decodeA: Decoder[A]) extends Server[OAuth2Credentials, A]("bla", _mapToClients, _mapToStates) { + def Error(msg: String): Json = Json.obj("error" -> Json.fromString(msg)) + + implicit val stateEncoder: Encoder[AtomicObjectState[A]] = Encoder.forProduct1("immutableState")(rdo => (rdo.immutableState)) +// implicit val stateDecoder: Decoder[AtomicObjectState[List[ChatMessage]]] = deriveDecoder[AtomicObjectState[List[ChatMessage]]] + implicit val rdoEncoder: Encoder[RdObject[A]] = Encoder.forProduct1("state")(rdo => (rdo.state)) + + implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { + final def apply(c: HCursor): Decoder.Result[AtomicObjectState[A]] = + for { + immutableState <- c.downField("immutableState").as[A] + } yield { + AtomicObjectState.initial(immutableState) + } + } + +// val mapToState = _mapToStates; + + lazy val Hello: PartialService = { + case GET at "/hello" => + Ok("Hello World, from Rover!") +// case _ => +// NotFound + } + + val oauthDataHandler = new OAuthDataHandler() + lazy val OAuthApi: PartialService = { + // Nothing special here, but look how we handle the 404 case. + case GET at "/auth" => + println(s"Handling auth!") + IO.fromFuture[Response] { + IO { + async { + val request = new AuthorizationRequest( + Map("Authorization" -> Seq("Basic Y2xpZW50X2lkX3ZhbHVlOmNsaWVudF9zZWNyZXRfdmFsdWU=")), + Map("grant_type" -> Seq("password"), "username" -> Seq("steffan"), "password" -> Seq("pass"), "scope" -> Seq("all"))) + + // val dataHandler = + val f = new TokenEndpoint { + override val handlers = Map( + "password" -> new Password()) + }.handleRequest(request, oauthDataHandler) + + // async { + await(f) match { + case Left(x) => InternalServerError(Error(x.description)) + case Right(x) => Ok(x.accessToken) + // } + + // whenReady(f) { result => result should be('right) } + } } + } + } + +// IO { +// val request = new AuthorizationRequest( +// Map("Authorization" -> Seq("Basic Y2xpZW50X2lkX3ZhbHVlOmNsaWVudF9zZWNyZXRfdmFsdWU=")), +// Map("grant_type" -> Seq("password"), "username" -> Seq("steffan"), "password" -> Seq("pass"), "scope" -> Seq("all"))) +// +//// val dataHandler = +// val f = new TokenEndpoint { +// override val handlers = Map( +// "password" -> new Password()) +// }.handleRequest(request, oauthDataHandler) +// +//// async { +// await(f) match { +// case Left(x) => InternalServerError(Error(x.description)) +// case Right(x) => Ok(x.accessToken) +//// } +// +// // whenReady(f) { result => result should be('right) } +// } +// +// Ok("Test") +// } + + } + + lazy val Api: PartialService = { + // Nothing special here, but look how we handle the 404 case. + case GET at url"/api/rdo/$id" => + println(mapToStates) + println(s"Getting $id from $mapToStates") + IO { + Try(id).toOption.flatMap(mapToStates.get).map { state => + Ok(state.asJson) + }.getOrElse { + NotFound(Error(s"No rdo found for id: `$id'")) + } + } + + case request @ POST at url"/api/rdo/$id" => + println(s"Handling POST request for $id") + Try(id).toOption.flatMap(mapToStates.get).map { state => + // TODO: Decide what to return here +// val existing: RdObject[A] = new RdObject[A](state) + request.readAs[Json].map { jsonBody => + println(s"Request string: ${jsonBody.toString()}") + val updatedState = jsonBody.as[AtomicObjectState[A]] + if (updatedState.isLeft) { + val failure = updatedState.left.get + InternalServerError(Error(s"${failure.message}: \n${failure.history.mkString("\n")}")) + } else { + val updated = updatedState.right.get + deliveredState(id, updated) + Ok(new RdObject(updated).asJson) + } + // val updatedRDO: = state.copy( +// text = root.text.string.getOption(jsonBody).getOrElse(state.text), +// done = root.done.boolean.getOption(jsonBody).getOrElse(state.done) +// ) + } + }.getOrElse { + NotFound(Error(s"No rdo found for id: `$id'")) + } + + } + + def start(): Unit = { +// val + println(s"Starting server at port $port") + lol.http.Server.listen(port)(Hello.orElse(OAuthApi.orElse(Api)).orElse { case _ => NotFound }) + +// async { +// while(true) {} +// } + } +} + +object Server { +// val CHAT_STATE = AtomicObjectState.initial(List[Any]()) + def fromAddress[C, A](address: String): Server[C, A] = { + return new Server[C, A](address, Map[Session[C, A]#Identifier, Client[C,A]](), Map[String, AtomicObjectState[A]]()) + } + + def getMapOfServer[C, A](server: Server[C, A]): Map[String, AtomicObjectState[A]] ={ + return server.mapToStates + } +} + +object Crypto { + def generateToken(): String = { + return "token" + } +} + +class OAuthDataHandler[User](identifier: OAuthDataHandler[User]#UserIdentifier = (user: User) => user.toString) extends DataHandler[User] { + type ID = String + type UserIdentifier = User => ID + + val _user: User = ChatUser.Steffan.asInstanceOf[User] + val _access_token: scalaoauth2.provider.AccessToken = scalaoauth2.provider.AccessToken("access_token!", Some("refresh_token"), Some("scope"), Some(3600.toLong), new Date()) + val _auth_info: scalaoauth2.provider.AuthInfo[User] = AuthInfo[User](_user, Some("chat"), Some("scope"), Some("")) + + override def validateClient(maybeCredential: Option[ClientCredential], request: AuthorizationRequest): Future[Boolean] = { + async { true } + } + + def deleteAuthCode(code: String): scala.concurrent.Future[Unit] = { + async { + + } + } + + def findUser(maybeCredential: Option[scalaoauth2.provider.ClientCredential],request: scalaoauth2.provider.AuthorizationRequest): scala.concurrent.Future[Option[User]] = { + async { + Some(_user) + } + } + +// def validateClient(clientId: String, clientSecret: String, grantType: String): Boolean = { +// true +// // Clients.validate(clientId, clientSecret, grantType) +// } + +// override def findUser(maybeCredential: Option[ClientCredential], request: AuthorizationRequest): Future[Option[User]] = + def findUser(username: String, password: String): Future[Option[User]] = { + async { + Some { + if (username == "steffan") ChatUser.Steffan.asInstanceOf[User] + else if (username == "giannis") ChatUser.Giannis.asInstanceOf[User] + else new ChatUser(username).asInstanceOf[User] + } + } + // Users.findUser(username, password) + } + + def createAccessToken(authInfo: AuthInfo[User]): Future[scalaoauth2.provider.AccessToken] = { +// val accessTokenExpiresIn = 60 * 60 // 1 hour +// val now = new Date() +// val createdAt = new Timestamp(now.getTime) +// val refreshToken = Some(Crypto.generateToken()) +// val accessToken = Crypto.generateToken() +// val userId = identifier(authInfo.user) + + async { + _access_token + } + +// val tokenObject = models.AccessToken(accessToken, refreshToken, userId, authInfo.scope, accessTokenExpiresIn, createdAt, authInfo.clientId) +// AccessTokens.deleteExistingAndCreate(tokenObject, userId, authInfo.clientId) +// scalaoauth2.provider.AccessToken(accessToken, refreshToken, authInfo.scope, Some(accessTokenExpiresIn.toLong), now) + } + + def getStoredAccessToken(authInfo: AuthInfo[User]): Future[Option[scalaoauth2.provider.AccessToken]] = { + val userId = identifier(authInfo.user) + async { + Some(_access_token) + } + // AccessTokens.findToken(userId, authInfo.clientId) map { a => +// scalaoauth2.provider.AccessToken(a.accessToken, a.refreshToken, a.scope, Some(a.expiresIn.toLong), a.createdAt) +// } + } + + def refreshAccessToken(authInfo: AuthInfo[User], refreshToken: String): Future[scalaoauth2.provider.AccessToken] = { +// createAccessToken(authInfo) + async { + _access_token + } + } + + def findClientUser(clientId: String, clientSecret: String, scope: Option[String]): Option[User] = { + None // Not implemented yet + } + + def findAccessToken(token: String): Future[Option[scalaoauth2.provider.AccessToken]] = { + async { Some(_access_token) } + // AccessTokens.findAccessToken(token) map { a => +// scalaoauth2.provider.AccessToken(a.accessToken, a.refreshToken, a.scope, Some(a.expiresIn.toLong), a.createdAt) +// } + } + + def findAuthInfoByAccessToken(accessToken: scalaoauth2.provider.AccessToken): Future[Option[AuthInfo[User]]] = { + async { + Some(_auth_info) + } +// AccessTokens.findAccessToken(accessToken.token) map { a => +// val user = Users.getById(a.userId).get +// AuthInfo(user, a.clientId, a.scope, Some("")) +// } + } + + def findAuthInfoByRefreshToken(refreshToken: String): Future[Option[AuthInfo[User]]] = { + async { + Some(_auth_info) + } + // AccessTokens.findRefreshToken(refreshToken) map { a => +// val user = Users.getById(a.userId).get +// AuthInfo(user, a.clientId, a.scope, Some("")) +// } + } + + def findAuthInfoByCode(code: String): Future[Option[AuthInfo[User]]] = { + async { Some(_auth_info) } + // AuthCodes.find(code) map { a => +// val user = Users.getById(a.userId).get +// AuthInfo(user, a.clientId, a.scope, a.redirectUri) +// } + } + +} \ No newline at end of file diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala new file mode 100644 index 0000000..186ff77 --- /dev/null +++ b/src/main/scala/rover/Session.scala @@ -0,0 +1,61 @@ +package rover + +import chatapp.ChatServer +import rover.rdo.AtomicObjectState +import rover.rdo.client.RdObject + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} + +class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) { + type Id = String + type Identifier = C => Id + + // TODO: Move this to the proper place + type ObjectId = Id + + + // TODO: Implement errors + def importRDO(objectId: ObjectId): Future[AtomicObjectState[A]] = { + async{ + if (objectId == "chat"){ + ChatServer.CHAT_STATE.asInstanceOf[AtomicObjectState[A]] + } + else null + } + } + + def importRDOwithState[A](objectId: ObjectId, stateId: String): Future[Unit] = { + // TODO: Hacks + async { + if (objectId == "chat") { + val atomicState = server.getAtomicStateWithId(stateId) + client.appendedState(stateId, atomicState) + } +// else null + } + } + + def exportRDOwithState[A](stateId: String): Future[Unit] = { + async{ + val atomicState = client.getAtomicStateWithId(stateId) + server.receivedState(stateId, atomicState) + } + } + + +} + +object Session { + // TODO: Probably better to make this mutable? It needs to be persistent in any case + private var _CACHE = Map[Session[Any, Any]#Id, Session[Any, Any]]() + + def get[C, A](sessionId: Session[C, A]#Id): Session[C, A] = { + _CACHE.get(sessionId).asInstanceOf[Session[C, A]] + } + + def set[C, A](sessionId: Session[C, A]#Id, session: Session[C, A]): Unit = { + _CACHE = _CACHE.updated(sessionId, session.asInstanceOf[Session[Any, Any]]) + } +} \ No newline at end of file diff --git a/src/main/scala/rover/rdo/AtomicObjectState.scala b/src/main/scala/rover/rdo/AtomicObjectState.scala index f43c0b6..9a0a0c3 100644 --- a/src/main/scala/rover/rdo/AtomicObjectState.scala +++ b/src/main/scala/rover/rdo/AtomicObjectState.scala @@ -2,10 +2,17 @@ package rover.rdo import rover.rdo.client.{Log, LogRecord} +import io.circe._, io.circe.syntax._ +import io.circe.{Encoder, Decoder, Json} + + // TODO: make ctor private -class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A]) extends ObjectState { +class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A])(implicit val encodeA: Encoder[A], implicit val decodeA: Decoder[A]) extends ObjectState { type Op = A => A + def encoded: Json = encodeA(value) +// def decoded: A = decodeA() + def immutableState: A = value def applyOp(operation: Op): AtomicObjectState[A] = { @@ -27,17 +34,58 @@ class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A]) e } } + def getLog: Log[A] = { + return this.log + } + override def toString: String = { value.toString } + + implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { + final def apply(c: HCursor): Decoder.Result[AtomicObjectState[A]] = + for { + immutableState <- c.downField("immutableState").as[A] + } yield { + AtomicObjectState.initial(immutableState) + } + } +// implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { +// final def apply(c: HCursor)(implicit encodeA: Encoder[A], decodeA: Decoder[A]): Decoder.Result[AtomicObjectState[A]] = +// for { +// immutableState <- c.downField("immutableState").as[A] +// } yield { +// AtomicObjectState.initial(immutableState) +// } +// } } object AtomicObjectState { - def initial[A](value: A): AtomicObjectState[A] = { + def initial[A](value: A)(implicit encodeA: Encoder[A], decodeA: Decoder[A]) : AtomicObjectState[A] = { return new AtomicObjectState[A](value, Log.withInitialState(value)) } - def fromLog[A](log: Log[A]): AtomicObjectState[A] = { +// def initial[A](value: List[A]): AtomicObjectState[A] = { +// return new AtomicObjectState[A](value.last, Log.withInitialStates(value.map(v => LogRecord[A](v, null, v)))) +// } + + def fromLog[A](log: Log[A])(implicit encodeA: Encoder[A], decodeA: Decoder[A]): AtomicObjectState[A] = { return new AtomicObjectState[A](log.asList.last.stateResult, log) } + +// implicit val encodeAtomicObjectState: Encoder[AtomicObjectState[_]] = new Encoder[AtomicObjectState[_]] { +// final def apply(s: AtomicObjectState[_]): Json = Json.obj( +// ("", s.asJson) +// ) +// } + +// implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[_]] = new Decoder[AtomicObjectState[_]] { +// final def apply[A](c: HCursor)(implicit encodeA: Encoder[A], decodeA: Decoder[A]): Decoder.Result[AtomicObjectState[A]] = +// for { +// immutableState <- c.downField("immutableState").as[A] +// } yield { +// AtomicObjectState.initial(immutableState) +// } +// } + } \ No newline at end of file diff --git a/src/main/scala/rover/rdo/client/Log.scala b/src/main/scala/rover/rdo/client/Log.scala index 9907ad1..91916d6 100644 --- a/src/main/scala/rover/rdo/client/Log.scala +++ b/src/main/scala/rover/rdo/client/Log.scala @@ -16,6 +16,11 @@ class Log[A](private val logList: List[LogRecord[A]] = List()) { return new Log[A](list) } + def concatenated(listOfRecords: List[LogRecord[A]]): Log[A] = { + val list = this.logList ++ listOfRecords + return new Log[A](list) + } + def asList: List[LogRecord[A]] = { return logList } @@ -30,4 +35,8 @@ object Log { def withInitialState[A](a: A): Log[A] = { return new Log[A]().appended(LogRecord[A](a,null, a)) } + + def withInitialStates[A](a: List[LogRecord[A]]): Log[A] = { + return new Log[A]().concatenated(a) + } } diff --git a/src/main/scala/rover/rdo/client/RdObject.scala b/src/main/scala/rover/rdo/client/RdObject.scala index 5769b1e..f50200b 100644 --- a/src/main/scala/rover/rdo/client/RdObject.scala +++ b/src/main/scala/rover/rdo/client/RdObject.scala @@ -2,6 +2,10 @@ package rover.rdo.client import rover.rdo.AtomicObjectState +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async} +import scala.concurrent.{Promise, Future} + //FIXME: use hashes instead of Longs/Strings? class RdObject[A](var state: AtomicObjectState[A]) { @@ -9,6 +13,13 @@ class RdObject[A](var state: AtomicObjectState[A]) { protected final def modifyState(op: AtomicObjectState[A]#Op): Unit = { state = state.applyOp(op) +// onStateModified(state) + } + + protected def onStateModified(oldState: AtomicObjectState[A]): Future[Unit] = { + async { + + } } protected final def immutableState: A = { @@ -29,6 +40,10 @@ class RdObject[A](var state: AtomicObjectState[A]) { * @param other Some other RDO */ class CommonAncestor[A](private val one: RdObject[A], private val other: RdObject[A]) extends RdObject[A](null) { // todo: fixme with a deferred state + override def onStateModified(oldState: AtomicObjectState[A]): Future[Unit] = { +// Promise() completeWith async { } + async {} + } // determine it once and defer all RdObject methods to it def commonAncestor: RdObject[A] = { @@ -45,8 +60,8 @@ class CommonAncestor[A](private val one: RdObject[A], private val other: RdObjec val indexOfI = one.state.log.asList.indexOf(i) val logRecordsUpToI = one.state.log.asList.slice(0, indexOfI+1) val logUpToI = new Log[A](logRecordsUpToI) - val ancestor = new RdObject[A](AtomicObjectState.fromLog(logUpToI)) - return ancestor +// val ancestor = new RdObject[A](AtomicObjectState.fromLog(logUpToI)) +// return ancestor } } } diff --git a/src/main/scala/votingapp/Poll.scala b/src/main/scala/votingapp/Poll.scala index 890bd56..ff3083d 100644 --- a/src/main/scala/votingapp/Poll.scala +++ b/src/main/scala/votingapp/Poll.scala @@ -1,99 +1,99 @@ -package votingapp - -import rover.rdo.AtomicObjectState -import rover.rdo.client.{CommonAncestor, RdObject} - -class Poll(val question: String, val choices: List[PollChoice], state: AtomicObjectState[Votes]) extends RdObject[Votes](state) { - - def cast(vote: PollChoice): Unit = { - modifyState(votes => votes.add(vote)) - } - - def result: PollResult = { - val votesState = this.immutableState - return new PollResult(votesState) - } - - override def toString: String = immutableState.toString - -// override def currentVersion: Long = 0 -// override def stableVersion: Long = 0 -} - -object Poll { - def apply(question: String, choices: List[PollChoice]): Poll = { - return new Poll(question, choices, AtomicObjectState.initial(Votes(choices))) - } - - def copyOf(poll: Poll): Poll = { - return new Poll(poll.question, poll.choices, poll.state) - } -} - -class PollResult(private val votes: Votes) { - def winner: PollChoice = { - votes.majorityChoice - } -} - -case class PollChoice(choice: String) - -class Votes(val map: Map[PollChoice, Int]) { - /** - * Adds the given poll choice to the votes cast. Also: immutable object pattern. - * @param vote The vote-choice to cast - * @return New state with the vote added - */ - def add(vote: PollChoice): Votes = { - Votes(map updated (vote, map(vote) + 1)) - } - - def majorityChoice: PollChoice = { - // FIXME: ties, idea: return a "poll-result" not the choice - val winner = map.maxBy(_._2)._1 - return winner - } - - override def toString: String = { - map.toString - } -} - -object Votes { - def apply(choices: List[PollChoice]): Votes= { - // TODO: maybe remove, not needed anymore... we accept any choice? Or enforce only valid choices - return new Votes(choices.map(choice => (choice,0)).toMap.withDefaultValue(0)) - } - - def apply(votes: Map[PollChoice, Int]): Votes = { - return new Votes(votes.withDefaultValue(0)) - } -} - -object henk { - def main(args: Array[String]): Unit = { - val poll = Poll("Does this work", List(PollChoice("Yes"), PollChoice("No"), PollChoice("I hope so"), PollChoice("Yes"))) - println(poll) - - val poll2 = Poll.copyOf(poll) - println(poll2) - - println("\ncasting votes:") - - poll.cast(PollChoice("Yes")) - poll.cast(PollChoice("Yes")) - poll.cast(PollChoice("No")) - - poll2.cast(PollChoice("No")) - poll2.cast(PollChoice("No")) - println("Poll:" + poll) - println("Poll2:" + poll2) - - val parent = new CommonAncestor[Votes](poll, poll2) - println("Parent:" + parent.toString) - println(poll.result.winner) - println("Immutable state:" + poll.toString) - - - } -} \ No newline at end of file +//package votingapp +// +//import rover.rdo.AtomicObjectState +//import rover.rdo.client.{CommonAncestor, RdObject} +// +//class Poll(val question: String, val choices: List[PollChoice], state: AtomicObjectState[Votes]) extends RdObject[Votes](state) { +// +// def cast(vote: PollChoice): Unit = { +// modifyState(votes => votes.add(vote)) +// } +// +// def result: PollResult = { +// val votesState = this.immutableState +// return new PollResult(votesState) +// } +// +// override def toString: String = immutableState.toString +// +//// override def currentVersion: Long = 0 +//// override def stableVersion: Long = 0 +//} +// +//object Poll { +// def apply(question: String, choices: List[PollChoice]): Poll = { +// return new Poll(question, choices, AtomicObjectState.initial(Votes(choices))) +// } +// +// def copyOf(poll: Poll): Poll = { +// return new Poll(poll.question, poll.choices, poll.state) +// } +//} +// +//class PollResult(private val votes: Votes) { +// def winner: PollChoice = { +// votes.majorityChoice +// } +//} +// +//case class PollChoice(choice: String) +// +//class Votes(val map: Map[PollChoice, Int]) { +// /** +// * Adds the given poll choice to the votes cast. Also: immutable object pattern. +// * @param vote The vote-choice to cast +// * @return New state with the vote added +// */ +// def add(vote: PollChoice): Votes = { +// Votes(map updated (vote, map(vote) + 1)) +// } +// +// def majorityChoice: PollChoice = { +// // FIXME: ties, idea: return a "poll-result" not the choice +// val winner = map.maxBy(_._2)._1 +// return winner +// } +// +// override def toString: String = { +// map.toString +// } +//} +// +//object Votes { +// def apply(choices: List[PollChoice]): Votes= { +// // TODO: maybe remove, not needed anymore... we accept any choice? Or enforce only valid choices +// return new Votes(choices.map(choice => (choice,0)).toMap.withDefaultValue(0)) +// } +// +// def apply(votes: Map[PollChoice, Int]): Votes = { +// return new Votes(votes.withDefaultValue(0)) +// } +//} +// +//object henk { +// def main(args: Array[String]): Unit = { +// val poll = Poll("Does this work", List(PollChoice("Yes"), PollChoice("No"), PollChoice("I hope so"), PollChoice("Yes"))) +// println(poll) +// +// val poll2 = Poll.copyOf(poll) +// println(poll2) +// +// println("\ncasting votes:") +// +// poll.cast(PollChoice("Yes")) +// poll.cast(PollChoice("Yes")) +// poll.cast(PollChoice("No")) +// +// poll2.cast(PollChoice("No")) +// poll2.cast(PollChoice("No")) +// println("Poll:" + poll) +// println("Poll2:" + poll2) +// +// val parent = new CommonAncestor[Votes](poll, poll2) +// println("Parent:" + parent.toString) +// println(poll.result.winner) +// println("Immutable state:" + poll.toString) +// +// +// } +//} \ No newline at end of file