From d50c5c45be43632a8627480496e35df56eadf822 Mon Sep 17 00:00:00 2001 From: Steffan Sluis Date: Mon, 18 Mar 2019 13:09:08 +0100 Subject: [PATCH 1/9] Chat client --- build.gradle | 19 +- src/main/scala/chatapp/Chat.scala | 147 +++++++++---- src/main/scala/chatapp/REPL.scala | 66 ++++++ .../scala/rover/rdo/client/RdObject.scala | 16 +- src/main/scala/votingapp/Poll.scala | 198 +++++++++--------- 5 files changed, 299 insertions(+), 147 deletions(-) create mode 100644 src/main/scala/chatapp/REPL.scala diff --git a/build.gradle b/build.gradle index 24cde6e..3c5a069 100644 --- a/build.gradle +++ b/build.gradle @@ -1,14 +1,19 @@ plugins { id 'scala' + id 'idea' id "com.github.maiflai.scalatest" version "0.24" } repositories { mavenCentral() + jcenter() } dependencies { implementation 'org.scala-lang:scala-library:2.12.8' + implementation 'org.scala-lang.modules:scala-async_2.12:0.9.7' + compile 'com.monovore:decline_2.12:0.6.1' + compile 'org.typelevel:cats-core_2.12:1.5.0' testImplementation 'org.pegdown:pegdown:1.4.2' testImplementation 'org.scalatest:scalatest_2.12:3.0.6' testImplementation 'junit:junit:4.12' @@ -16,4 +21,16 @@ dependencies { task wrapper(type: Wrapper) { gradleVersion = '4.10' -} \ No newline at end of file +} + +idea{ + module { + testSourceDirs += sourceSets.main.runtimeClasspath + } +} + +task run(type: JavaExec, dependsOn: classes) { + main = 'chatapp.Chat' + standardInput = System.in + classpath sourceSets.main.runtimeClasspath +} diff --git a/src/main/scala/chatapp/Chat.scala b/src/main/scala/chatapp/Chat.scala index f9cec1c..11d463f 100644 --- a/src/main/scala/chatapp/Chat.scala +++ b/src/main/scala/chatapp/Chat.scala @@ -2,68 +2,127 @@ package chatapp import rover.rdo.AtomicObjectState import rover.rdo.client.RdObject +import cats.implicits._ +import com.monovore.decline._ -// FIXME: ensure messages can be read, but not modified or reassigned... -// FIXME: after state & rd object impl change -class Chat(var messages: List[ChatMessage]) extends RdObject[String](AtomicObjectState.initial("")) { +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} +import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.duration._ - // TODO: Something with users, crypto stuff on identity - // FIXME: Hashes should be used here as well as user ids - private var users = Map[Long, String]() +class User(val username: String) { - def this(messages: List[ChatMessage], users: Map[Long, String]) { - this(messages) - this.users = users - } +} - def addUser(id: Long, uri: String): Unit = { - if (!this.users.contains(id)) this.users updated (id, uri) - // FIXME: otherwise we can also update, a user's uri - else println("Already existing user") - } +object User { + val Steffan = new User("steffan") + val Giannis = new User("giannis") +} + +class ChatMessage(val body: String, + val author: User, + val timestamp: Long = java.time.Instant.now.getEpochSecond()) { - def removeUser(id : Long)= { - if(users.contains(id)) users = users - id - else println("Non-exsiting id given") + override def toString: String = { + s"${author.username}: $body" } +} + +// FIXME: ensure messages can be read, but not modified or reassigned... +// FIXME: after state & rd object impl change +class Chat(_onStateModified: Chat#Updater) extends RdObject[List[ChatMessage]](AtomicObjectState.initial(List[ChatMessage]())) { + type Updater = AtomicObjectState[List[ChatMessage]] => Promise[Unit] - def appendMessage(chatMessage: ChatMessage): Unit = { - messages = messages :+ chatMessage + def send(message: ChatMessage): Promise[Unit]= { + val op: AtomicObjectState[List[ChatMessage]]#Op = s => s :+ message + + Promise() completeWith async { + modifyState(op) + } } - def terminate(): Unit ={ - //TODO: terminate + override def onStateModified(oldState: AtomicObjectState[List[ChatMessage]]): Promise[Unit] = { + _onStateModified(state) } def currentVersion(): Long = { - messages.size + immutableState.size } -// override def stableVersion: Long = { -// // TODO: the stable version (last committed) -// messages.size -// } +} +object Chat { + val SIZE = 10 + + def client(serverAddress: String, user: User): Unit = { + val printer = (string: String) => { + val cls = s"${string.split("\n").map(c => s"${REPL.UP}${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_LINE_AFTER}").mkString("")}" + // Prepend two spaces to match input indentation of "> " + val text = string.split("\n").map(line => s" $line").mkString("\n") - def printMessages(): Unit = { - for (i <- messages) { - println(s"body: ${i.body}, author: ${i.author}, time: ${i.timestamp}") + s"${REPL.SAVE_CURSOR}$cls\r$text${REPL.RESTORE_CURSOR}" } - } -} -class ChatMessage(val body: String, - val author: String, - val timestamp: Long = java.time.Instant.now.getEpochSecond()) -{ + // TODO: This is hacky, figure out a better way to do this + val updater: Chat#Updater = state => Promise() completeWith async { + val text = state.immutableState.takeRight(SIZE).map(m => m.toString()).mkString("\n") + print(s"${printer(text)}") + } -} + val chat = new Chat(updater) + println(" Welcome to Rover Chat!") + print((1 to SIZE).map(i => "\n").mkString("")) + + // Simulate conversation + async { + Thread.sleep(3000) + await(chat.send(new ChatMessage("Hey man!", User.Giannis)).future) + updater(chat.state) + + Thread.sleep(3000) + await(chat.send(new ChatMessage("How's it going?", User.Giannis)).future) + updater(chat.state) + + Thread.sleep(10000) + await(chat.send(new ChatMessage("Yea man I'm good", User.Giannis)).future) + updater(chat.state) + } + + val reader = () => { + print("> ") + val s = scala.io.StdIn.readLine() + s + } + val executor = (input: String) => { + async { + val p = chat.send(new ChatMessage(input, user)) + await(p.future) + // This clears the input line + print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}") + chat.immutableState.takeRight(SIZE).map(m => s"${m.toString()}").mkString("\n") + } + } + val repl: REPL[String] = new REPL(reader, executor, printer) + Await.result(repl.loop(), Duration.Inf) + } -object chatFoo{ - def main(args: Array[String]): Unit ={ - val chat = new Chat(messages = List[ChatMessage]()) - chat.addUser(1234, "tt") - val m: ChatMessage = new ChatMessage("test message", "Ioa") - chat.appendMessage(m) - chat.printMessages() + def main(args: Array[String]): Unit = { + client("bla", User.Steffan) } } + +// TODO: Add client and server subcommands +object ChatCLI extends CommandApp( + name = "rover-chat", + header = "Says hello!", + main = { + val userOpt = + Opts.option[String]("target", help = "Person to greet.") .withDefault("world") + + val quietOpt = Opts.flag("quiet", help = "Whether to be quiet.").orFalse + + (userOpt, quietOpt).mapN { (user, quiet) => + if (quiet) println("...") + else println(s"Hello $user!") + } + } +) diff --git a/src/main/scala/chatapp/REPL.scala b/src/main/scala/chatapp/REPL.scala new file mode 100644 index 0000000..649c432 --- /dev/null +++ b/src/main/scala/chatapp/REPL.scala @@ -0,0 +1,66 @@ +package chatapp + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Await, Future, Promise} +import scala.async.Async.{async, await} + + +class REPL[A](reader: REPL[A]#Reader, executor: REPL[A]#Executor, printer: REPL[A]#Printer) { + type Reader = () => String + type Executor = String => Future[A] + type Printer = A => String + + def loop(): Future[Unit] = { + var looping = true + async { + while(looping) { + val input = reader() + + if (input == null || input == "/exit") { + looping = false + } else if(input == "") { + print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}\r") + } else { + val result = await(executor(input)) + val output = printer(result) + + print(output) + } + } + } + } + +} + +object REPL { + val ERASE_SCREEN_AFTER="\u001b[0J" + val ERASE_LINE_BEFORE="\u001b[1K" + val ERASE_LINE_AFTER="\u001b[0K" + + val HOME="\u001b[1H" + val UP="\u001b[1A" + val DOWN="\u001b[1B" + val FORWARD="\u001b[1C" + val BACKWARD="\u001b[1D" + + val SAVE_CURSOR="\u001b[0s" + val RESTORE_CURSOR="\u001b[0u" + + def main(args: Array[String]): Unit = { + val reader = () => { + print(s"${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_SCREEN_AFTER}\r> ") + scala.io.StdIn.readLine() + } + val executor = (input: String) => { + async { input } + } + val printer = (string: String) => { + val cls = s"${string.split("\n").map(c => UP).mkString("")}$ERASE_LINE_BEFORE$ERASE_SCREEN_AFTER\r" + // Prepend two spaces to match input indentation of "> " + val text = string.split("\n").map(line => s" $line").mkString("\n") + s"$cls$text\n" + } + val repl = new REPL(reader, executor, printer) + repl.loop() + } +} diff --git a/src/main/scala/rover/rdo/client/RdObject.scala b/src/main/scala/rover/rdo/client/RdObject.scala index 5769b1e..13add53 100644 --- a/src/main/scala/rover/rdo/client/RdObject.scala +++ b/src/main/scala/rover/rdo/client/RdObject.scala @@ -2,15 +2,22 @@ package rover.rdo.client import rover.rdo.AtomicObjectState +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async} +import scala.concurrent.{Promise} + //FIXME: use hashes instead of Longs/Strings? -class RdObject[A](var state: AtomicObjectState[A]) { +abstract class RdObject[A](var state: AtomicObjectState[A]) { // TODO: "is up to date" or "version" methods protected final def modifyState(op: AtomicObjectState[A]#Op): Unit = { state = state.applyOp(op) +// onStateModified(state) } + protected def onStateModified(oldState: AtomicObjectState[A]): Promise[Unit] + protected final def immutableState: A = { return state.immutableState } @@ -29,6 +36,9 @@ class RdObject[A](var state: AtomicObjectState[A]) { * @param other Some other RDO */ class CommonAncestor[A](private val one: RdObject[A], private val other: RdObject[A]) extends RdObject[A](null) { // todo: fixme with a deferred state + override def onStateModified(oldState: AtomicObjectState[A]): Promise[Unit] = { + Promise() completeWith async { } + } // determine it once and defer all RdObject methods to it def commonAncestor: RdObject[A] = { @@ -45,8 +55,8 @@ class CommonAncestor[A](private val one: RdObject[A], private val other: RdObjec val indexOfI = one.state.log.asList.indexOf(i) val logRecordsUpToI = one.state.log.asList.slice(0, indexOfI+1) val logUpToI = new Log[A](logRecordsUpToI) - val ancestor = new RdObject[A](AtomicObjectState.fromLog(logUpToI)) - return ancestor +// val ancestor = new RdObject[A](AtomicObjectState.fromLog(logUpToI)) +// return ancestor } } } diff --git a/src/main/scala/votingapp/Poll.scala b/src/main/scala/votingapp/Poll.scala index 890bd56..ff3083d 100644 --- a/src/main/scala/votingapp/Poll.scala +++ b/src/main/scala/votingapp/Poll.scala @@ -1,99 +1,99 @@ -package votingapp - -import rover.rdo.AtomicObjectState -import rover.rdo.client.{CommonAncestor, RdObject} - -class Poll(val question: String, val choices: List[PollChoice], state: AtomicObjectState[Votes]) extends RdObject[Votes](state) { - - def cast(vote: PollChoice): Unit = { - modifyState(votes => votes.add(vote)) - } - - def result: PollResult = { - val votesState = this.immutableState - return new PollResult(votesState) - } - - override def toString: String = immutableState.toString - -// override def currentVersion: Long = 0 -// override def stableVersion: Long = 0 -} - -object Poll { - def apply(question: String, choices: List[PollChoice]): Poll = { - return new Poll(question, choices, AtomicObjectState.initial(Votes(choices))) - } - - def copyOf(poll: Poll): Poll = { - return new Poll(poll.question, poll.choices, poll.state) - } -} - -class PollResult(private val votes: Votes) { - def winner: PollChoice = { - votes.majorityChoice - } -} - -case class PollChoice(choice: String) - -class Votes(val map: Map[PollChoice, Int]) { - /** - * Adds the given poll choice to the votes cast. Also: immutable object pattern. - * @param vote The vote-choice to cast - * @return New state with the vote added - */ - def add(vote: PollChoice): Votes = { - Votes(map updated (vote, map(vote) + 1)) - } - - def majorityChoice: PollChoice = { - // FIXME: ties, idea: return a "poll-result" not the choice - val winner = map.maxBy(_._2)._1 - return winner - } - - override def toString: String = { - map.toString - } -} - -object Votes { - def apply(choices: List[PollChoice]): Votes= { - // TODO: maybe remove, not needed anymore... we accept any choice? Or enforce only valid choices - return new Votes(choices.map(choice => (choice,0)).toMap.withDefaultValue(0)) - } - - def apply(votes: Map[PollChoice, Int]): Votes = { - return new Votes(votes.withDefaultValue(0)) - } -} - -object henk { - def main(args: Array[String]): Unit = { - val poll = Poll("Does this work", List(PollChoice("Yes"), PollChoice("No"), PollChoice("I hope so"), PollChoice("Yes"))) - println(poll) - - val poll2 = Poll.copyOf(poll) - println(poll2) - - println("\ncasting votes:") - - poll.cast(PollChoice("Yes")) - poll.cast(PollChoice("Yes")) - poll.cast(PollChoice("No")) - - poll2.cast(PollChoice("No")) - poll2.cast(PollChoice("No")) - println("Poll:" + poll) - println("Poll2:" + poll2) - - val parent = new CommonAncestor[Votes](poll, poll2) - println("Parent:" + parent.toString) - println(poll.result.winner) - println("Immutable state:" + poll.toString) - - - } -} \ No newline at end of file +//package votingapp +// +//import rover.rdo.AtomicObjectState +//import rover.rdo.client.{CommonAncestor, RdObject} +// +//class Poll(val question: String, val choices: List[PollChoice], state: AtomicObjectState[Votes]) extends RdObject[Votes](state) { +// +// def cast(vote: PollChoice): Unit = { +// modifyState(votes => votes.add(vote)) +// } +// +// def result: PollResult = { +// val votesState = this.immutableState +// return new PollResult(votesState) +// } +// +// override def toString: String = immutableState.toString +// +//// override def currentVersion: Long = 0 +//// override def stableVersion: Long = 0 +//} +// +//object Poll { +// def apply(question: String, choices: List[PollChoice]): Poll = { +// return new Poll(question, choices, AtomicObjectState.initial(Votes(choices))) +// } +// +// def copyOf(poll: Poll): Poll = { +// return new Poll(poll.question, poll.choices, poll.state) +// } +//} +// +//class PollResult(private val votes: Votes) { +// def winner: PollChoice = { +// votes.majorityChoice +// } +//} +// +//case class PollChoice(choice: String) +// +//class Votes(val map: Map[PollChoice, Int]) { +// /** +// * Adds the given poll choice to the votes cast. Also: immutable object pattern. +// * @param vote The vote-choice to cast +// * @return New state with the vote added +// */ +// def add(vote: PollChoice): Votes = { +// Votes(map updated (vote, map(vote) + 1)) +// } +// +// def majorityChoice: PollChoice = { +// // FIXME: ties, idea: return a "poll-result" not the choice +// val winner = map.maxBy(_._2)._1 +// return winner +// } +// +// override def toString: String = { +// map.toString +// } +//} +// +//object Votes { +// def apply(choices: List[PollChoice]): Votes= { +// // TODO: maybe remove, not needed anymore... we accept any choice? Or enforce only valid choices +// return new Votes(choices.map(choice => (choice,0)).toMap.withDefaultValue(0)) +// } +// +// def apply(votes: Map[PollChoice, Int]): Votes = { +// return new Votes(votes.withDefaultValue(0)) +// } +//} +// +//object henk { +// def main(args: Array[String]): Unit = { +// val poll = Poll("Does this work", List(PollChoice("Yes"), PollChoice("No"), PollChoice("I hope so"), PollChoice("Yes"))) +// println(poll) +// +// val poll2 = Poll.copyOf(poll) +// println(poll2) +// +// println("\ncasting votes:") +// +// poll.cast(PollChoice("Yes")) +// poll.cast(PollChoice("Yes")) +// poll.cast(PollChoice("No")) +// +// poll2.cast(PollChoice("No")) +// poll2.cast(PollChoice("No")) +// println("Poll:" + poll) +// println("Poll2:" + poll2) +// +// val parent = new CommonAncestor[Votes](poll, poll2) +// println("Parent:" + parent.toString) +// println(poll.result.winner) +// println("Immutable state:" + poll.toString) +// +// +// } +//} \ No newline at end of file From 6ed49df906a4c0e6bab1a2f96d92ce32c6e13fcb Mon Sep 17 00:00:00 2001 From: Steffan Sluis Date: Wed, 20 Mar 2019 03:06:05 +0100 Subject: [PATCH 2/9] Servers and clients --- src/main/scala/chatapp/Chat.scala | 90 +++++++++++++------ src/main/scala/rover/Client.scala | 19 ++++ src/main/scala/rover/Server.scala | 18 ++++ src/main/scala/rover/Session.scala | 47 ++++++++++ .../scala/rover/rdo/client/RdObject.scala | 15 ++-- 5 files changed, 159 insertions(+), 30 deletions(-) create mode 100644 src/main/scala/rover/Client.scala create mode 100644 src/main/scala/rover/Server.scala create mode 100644 src/main/scala/rover/Session.scala diff --git a/src/main/scala/chatapp/Chat.scala b/src/main/scala/chatapp/Chat.scala index 11d463f..f4796b5 100644 --- a/src/main/scala/chatapp/Chat.scala +++ b/src/main/scala/chatapp/Chat.scala @@ -4,6 +4,8 @@ import rover.rdo.AtomicObjectState import rover.rdo.client.RdObject import cats.implicits._ import com.monovore.decline._ +import rover.Client.OAuth2Credentials +import rover.{Client, Server, Session} import scala.concurrent.ExecutionContext.Implicits.global import scala.async.Async.{async, await} @@ -30,8 +32,9 @@ class ChatMessage(val body: String, // FIXME: ensure messages can be read, but not modified or reassigned... // FIXME: after state & rd object impl change -class Chat(_onStateModified: Chat#Updater) extends RdObject[List[ChatMessage]](AtomicObjectState.initial(List[ChatMessage]())) { - type Updater = AtomicObjectState[List[ChatMessage]] => Promise[Unit] +class Chat(_onStateModified: Chat#Updater, initialState: AtomicObjectState[List[ChatMessage]] = AtomicObjectState.initial(List[ChatMessage]())) extends RdObject[List[ChatMessage]](AtomicObjectState.initial(List[ChatMessage]())) { +// type State = List[ChatMessage] + type Updater = AtomicObjectState[List[ChatMessage]] => Future[Unit] def send(message: ChatMessage): Promise[Unit]= { val op: AtomicObjectState[List[ChatMessage]]#Op = s => s :+ message @@ -41,7 +44,7 @@ class Chat(_onStateModified: Chat#Updater) extends RdObject[List[ChatMessage]](A } } - override def onStateModified(oldState: AtomicObjectState[List[ChatMessage]]): Promise[Unit] = { + override def onStateModified(oldState: AtomicObjectState[List[ChatMessage]]): Future[Unit] = { _onStateModified(state) } @@ -50,27 +53,50 @@ class Chat(_onStateModified: Chat#Updater) extends RdObject[List[ChatMessage]](A } } + object Chat { - val SIZE = 10 + def fromRDO(rdo: RdObject[List[ChatMessage]], _onStateModified: Chat#Updater): Chat = { + new Chat(_onStateModified, rdo.state) + } +} - def client(serverAddress: String, user: User): Unit = { - val printer = (string: String) => { - val cls = s"${string.split("\n").map(c => s"${REPL.UP}${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_LINE_AFTER}").mkString("")}" - // Prepend two spaces to match input indentation of "> " - val text = string.split("\n").map(line => s" $line").mkString("\n") +class ChatServer() extends Server[OAuth2Credentials](null) { - s"${REPL.SAVE_CURSOR}$cls\r$text${REPL.RESTORE_CURSOR}" - } +} + +class ChatClient(serverAddress: String) extends Client.OAuth2Client(serverAddress, (credentials: OAuth2Credentials) => credentials.accessToken) { + var session: Session[OAuth2Credentials] = null + var user: User = null + var chat: Chat = null; + + val printer = (string: String) => { + val cls = s"${string.split("\n").map(c => s"${REPL.UP}${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_LINE_AFTER}").mkString("")}" + // Prepend two spaces to match input indentation of "> " + val text = string.split("\n").map(line => s" $line").mkString("\n") + + s"${REPL.SAVE_CURSOR}$cls\r$text${REPL.RESTORE_CURSOR}" + } // TODO: This is hacky, figure out a better way to do this - val updater: Chat#Updater = state => Promise() completeWith async { - val text = state.immutableState.takeRight(SIZE).map(m => m.toString()).mkString("\n") - print(s"${printer(text)}") + val updater: Chat#Updater = state => async { + val text = state.immutableState.takeRight(ChatClient.SIZE).map(m => m.toString()).mkString("\n") + print(s"${printer(text)}") + } + + def login(user: User): Future[Unit] = { + async { + val credentials = new OAuth2Credentials("fake credentials", "fake credentials") + this.user = user + session = createSession(credentials) + val rdo = await(session.importRDO[List[ChatMessage]]("chat")) + chat = Chat.fromRDO(rdo, updater) } + } - val chat = new Chat(updater) + def render(): Future[Unit] = { +// val chat = new Chat(updater) println(" Welcome to Rover Chat!") - print((1 to SIZE).map(i => "\n").mkString("")) + print((1 to ChatClient.SIZE).map(i => "\n").mkString("")) // Simulate conversation async { @@ -93,23 +119,37 @@ object Chat { s } val executor = (input: String) => { - async { - val p = chat.send(new ChatMessage(input, user)) - await(p.future) - // This clears the input line - print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}") - chat.immutableState.takeRight(SIZE).map(m => s"${m.toString()}").mkString("\n") - } + async { + val p = chat.send(new ChatMessage(input, user)) + await(p.future) + // This clears the input line + print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}") + chat.state.immutableState.takeRight(ChatClient.SIZE).map(m => s"${m.toString()}").mkString("\n") + } } val repl: REPL[String] = new REPL(reader, executor, printer) - Await.result(repl.loop(), Duration.Inf) +// Await.result(repl.loop(), Duration.Inf) + repl.loop() } +} + +object ChatClient { + val SIZE = 10 + def main(args: Array[String]): Unit = { - client("bla", User.Steffan) + val serverAddress = "bla" + val client = new ChatClient(serverAddress) + val f = async { + await(client.login(User.Steffan)) + await(client.render()) + } + + Await.result(f, Duration.Inf) } } + // TODO: Add client and server subcommands object ChatCLI extends CommandApp( name = "rover-chat", diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala new file mode 100644 index 0000000..fa9c675 --- /dev/null +++ b/src/main/scala/rover/Client.scala @@ -0,0 +1,19 @@ +package rover + +class Client[C](serverAddress: String, identifier: Session[C]#Identifier) { + val server = Server.fromAddress[C](serverAddress) + + def createSession(credentials: C) = { + server.createSession(credentials) + } +} + + +object Client { + class OAuth2Credentials(val accessToken: String, val refreshToken: String) {} + type OAuth2Client = Client[OAuth2Credentials] + + def oauth2(): OAuth2Client = { + null + } +} diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala new file mode 100644 index 0000000..d6d2078 --- /dev/null +++ b/src/main/scala/rover/Server.scala @@ -0,0 +1,18 @@ +package rover + +class Server[C](identifier: Session[C]#Identifier) { + // TODO: Determine what to do with this + def clientFromCredentials(credentials: C): Client[C] = { + null + } + + def createSession(credentials: C) = { + new Session[C](credentials, this, clientFromCredentials(credentials)) + } +} + +object Server { + def fromAddress[C](address: String): Server[C] = { + new Server[C](null) + } +} \ No newline at end of file diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala new file mode 100644 index 0000000..3d4f999 --- /dev/null +++ b/src/main/scala/rover/Session.scala @@ -0,0 +1,47 @@ +package rover + +import rover.rdo.AtomicObjectState +import rover.rdo.client.RdObject + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} + +class Session[C](credentials: C, server: Server[C], client: Client[C]) { + type Id = String + type Identifier = C => Id + + // TODO: Move this to the proper place + type ObjectId = Id + +// val server: Server[C] = null +// val client: Client[C] = null + + // TODO: Implement errors + def importRDO[T](objectId: ObjectId): Future[RdObject[T]] = { + // TODO: Hacks + async { + if (objectId == "chat") new RdObject[T]( AtomicObjectState.initial(List[Any]().asInstanceOf[T]) ) + else null + } + } + + def exportRDO[T](rdo: RdObject[T]): ObjectId = { + null + } + + +} + +object Session { + // TODO: Probably better to make this mutable? It needs to be persistent in any case + private var _CACHE = Map[Session[Any]#Id, Session[Any]]() + + def get[T](sessionId: Session[T]#Id): Session[T] = { + _CACHE.get(sessionId).asInstanceOf[Session[T]] + } + + def set[T](sessionId: Session[T]#Id, session: Session[T]): Unit = { + _CACHE = _CACHE.updated(sessionId, session.asInstanceOf[Session[Any]]) + } +} \ No newline at end of file diff --git a/src/main/scala/rover/rdo/client/RdObject.scala b/src/main/scala/rover/rdo/client/RdObject.scala index 13add53..f50200b 100644 --- a/src/main/scala/rover/rdo/client/RdObject.scala +++ b/src/main/scala/rover/rdo/client/RdObject.scala @@ -4,10 +4,10 @@ import rover.rdo.AtomicObjectState import scala.concurrent.ExecutionContext.Implicits.global import scala.async.Async.{async} -import scala.concurrent.{Promise} +import scala.concurrent.{Promise, Future} //FIXME: use hashes instead of Longs/Strings? -abstract class RdObject[A](var state: AtomicObjectState[A]) { +class RdObject[A](var state: AtomicObjectState[A]) { // TODO: "is up to date" or "version" methods @@ -16,7 +16,11 @@ abstract class RdObject[A](var state: AtomicObjectState[A]) { // onStateModified(state) } - protected def onStateModified(oldState: AtomicObjectState[A]): Promise[Unit] + protected def onStateModified(oldState: AtomicObjectState[A]): Future[Unit] = { + async { + + } + } protected final def immutableState: A = { return state.immutableState @@ -36,8 +40,9 @@ abstract class RdObject[A](var state: AtomicObjectState[A]) { * @param other Some other RDO */ class CommonAncestor[A](private val one: RdObject[A], private val other: RdObject[A]) extends RdObject[A](null) { // todo: fixme with a deferred state - override def onStateModified(oldState: AtomicObjectState[A]): Promise[Unit] = { - Promise() completeWith async { } + override def onStateModified(oldState: AtomicObjectState[A]): Future[Unit] = { +// Promise() completeWith async { } + async {} } // determine it once and defer all RdObject methods to it From ab88ccce77333ddd12900f2e86b0f5a5933b3d36 Mon Sep 17 00:00:00 2001 From: giannislelekas Date: Wed, 20 Mar 2019 19:43:28 +0100 Subject: [PATCH 3/9] initialising AtomicState with list of immutable states --- src/main/scala/rover/rdo/AtomicObjectState.scala | 8 ++++++++ src/main/scala/rover/rdo/client/Log.scala | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/src/main/scala/rover/rdo/AtomicObjectState.scala b/src/main/scala/rover/rdo/AtomicObjectState.scala index f43c0b6..843fd45 100644 --- a/src/main/scala/rover/rdo/AtomicObjectState.scala +++ b/src/main/scala/rover/rdo/AtomicObjectState.scala @@ -27,6 +27,10 @@ class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A]) e } } + def getLog: Log[A] = { + return this.log + } + override def toString: String = { value.toString } @@ -37,6 +41,10 @@ object AtomicObjectState { return new AtomicObjectState[A](value, Log.withInitialState(value)) } + def initial[A](value: List[A]): AtomicObjectState[A] = { + return new AtomicObjectState[A](value.last, Log.withInitialStates(value.map(v => LogRecord[A](v, null, v)))) + } + def fromLog[A](log: Log[A]): AtomicObjectState[A] = { return new AtomicObjectState[A](log.asList.last.stateResult, log) } diff --git a/src/main/scala/rover/rdo/client/Log.scala b/src/main/scala/rover/rdo/client/Log.scala index 9907ad1..91916d6 100644 --- a/src/main/scala/rover/rdo/client/Log.scala +++ b/src/main/scala/rover/rdo/client/Log.scala @@ -16,6 +16,11 @@ class Log[A](private val logList: List[LogRecord[A]] = List()) { return new Log[A](list) } + def concatenated(listOfRecords: List[LogRecord[A]]): Log[A] = { + val list = this.logList ++ listOfRecords + return new Log[A](list) + } + def asList: List[LogRecord[A]] = { return logList } @@ -30,4 +35,8 @@ object Log { def withInitialState[A](a: A): Log[A] = { return new Log[A]().appended(LogRecord[A](a,null, a)) } + + def withInitialStates[A](a: List[LogRecord[A]]): Log[A] = { + return new Log[A]().concatenated(a) + } } From 55ee582afb994b88ce8b97c182246c34d6103e54 Mon Sep 17 00:00:00 2001 From: giannislelekas Date: Thu, 21 Mar 2019 11:54:57 +0100 Subject: [PATCH 4/9] client-server logic for importing-exporing RDOs within a Session --- src/main/scala/rover/Client.scala | 29 +++++++++++++++++++----- src/main/scala/rover/Server.scala | 36 ++++++++++++++++++++++++------ src/main/scala/rover/Session.scala | 26 ++++++++++++--------- 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala index fa9c675..3bf4f5c 100644 --- a/src/main/scala/rover/Client.scala +++ b/src/main/scala/rover/Client.scala @@ -1,19 +1,38 @@ package rover -class Client[C](serverAddress: String, identifier: Session[C]#Identifier) { - val server = Server.fromAddress[C](serverAddress) +import rover.rdo.AtomicObjectState - def createSession(credentials: C) = { +//class Client[C](serverAddress: String, identifier: Session[C]#Identifier) { +// val server = Server.fromAddress[C](serverAddress) +// +// def createSession(credentials: C) = { +// server.createSession(credentials) +// } +//} + +class Client[C, A](private val serverAddress: String, private val id: C, private var mapToStates: Map[String, AtomicObjectState[A]]){ + private val identifier: Session[C, A]#Identifier = null + + val server = Server.fromAddress[C,A](serverAddress) + def createSession(credentials: C): Session[C, A] = { server.createSession(credentials) } + + def appended(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ + this.mapToStates = this.mapToStates + (stateId -> atomicState) + } + + def getAtomicStateWithId(stateId: String): AtomicObjectState[A] ={ + return mapToStates(stateId) + } } object Client { class OAuth2Credentials(val accessToken: String, val refreshToken: String) {} - type OAuth2Client = Client[OAuth2Credentials] + type OAuth2Client[A] = Client[OAuth2Credentials, A] - def oauth2(): OAuth2Client = { + def oauth2[A](): OAuth2Client[A] = { null } } diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala index d6d2078..c4db691 100644 --- a/src/main/scala/rover/Server.scala +++ b/src/main/scala/rover/Server.scala @@ -1,18 +1,40 @@ package rover -class Server[C](identifier: Session[C]#Identifier) { +import rover.rdo.AtomicObjectState +import rover.rdo.client.{CommonAncestor, RdObject} + +class Server[C, A](identifier: Session[C, A]#Identifier, private val address: String, + private val mapToClients: Map[C, Client[C, A]], private var mapToStates: Map[String, AtomicObjectState[A]]) { // TODO: Determine what to do with this - def clientFromCredentials(credentials: C): Client[C] = { - null + def clientFromCredentials(credentials: C): Client[C, A] = { + new Client[C, A](this.address, credentials, Map[String, AtomicObjectState[A]]()) + } + + def createSession(credentials: C): Session[C, A] = { + new Session[C, A](credentials, this, clientFromCredentials(credentials)) + } + + def deliveredState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ + this.mapToStates = this.mapToStates + (stateId -> atomicState) + } + + def getAtomicStateWithId(stateId: String): AtomicObjectState[A] = { + return mapToStates(stateId) } - def createSession(credentials: C) = { - new Session[C](credentials, this, clientFromCredentials(credentials)) + def receivedState(stateId: String, state: AtomicObjectState[A]): Unit ={ + val clientRDO = new RdObject[A](state) + val serverRDO = new RdObject[A](mapToStates(stateId)) + val ancestor = new CommonAncestor[A](serverRDO, clientRDO) + if (ancestor == serverRDO) deliveredState(stateId, state) + else { + //FiXME: Conflict resolution and history diff stuff + } } } object Server { - def fromAddress[C](address: String): Server[C] = { - new Server[C](null) + def fromAddress[C, A](address: String): Server[C, A] = { + return new Server[C, A](null, address, Map[C, Client[C,A]](), Map[String, AtomicObjectState[A]]()) } } \ No newline at end of file diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala index 3d4f999..d3c2da3 100644 --- a/src/main/scala/rover/Session.scala +++ b/src/main/scala/rover/Session.scala @@ -7,7 +7,7 @@ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.async.Async.{async, await} -class Session[C](credentials: C, server: Server[C], client: Client[C]) { +class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) { type Id = String type Identifier = C => Id @@ -18,16 +18,22 @@ class Session[C](credentials: C, server: Server[C], client: Client[C]) { // val client: Client[C] = null // TODO: Implement errors - def importRDO[T](objectId: ObjectId): Future[RdObject[T]] = { + def importRDOwithState[A](objectId: ObjectId, stateId: String): Future[Unit] = { // TODO: Hacks async { - if (objectId == "chat") new RdObject[T]( AtomicObjectState.initial(List[Any]().asInstanceOf[T]) ) + if (objectId == "chat") { + val atomicState = server.getAtomicStateWithId(stateId) + client.appended(stateId, atomicState) + } else null } } - def exportRDO[T](rdo: RdObject[T]): ObjectId = { - null + def exportRDOwithState[A](stateId: String): Future[Unit] = { + async{ + val atomicState = client.getAtomicStateWithId(stateId) + server.receivedState(stateId, atomicState) + } } @@ -35,13 +41,13 @@ class Session[C](credentials: C, server: Server[C], client: Client[C]) { object Session { // TODO: Probably better to make this mutable? It needs to be persistent in any case - private var _CACHE = Map[Session[Any]#Id, Session[Any]]() + private var _CACHE = Map[Session[Any, Any]#Id, Session[Any, Any]]() - def get[T](sessionId: Session[T]#Id): Session[T] = { - _CACHE.get(sessionId).asInstanceOf[Session[T]] + def get[C, A](sessionId: Session[C, A]#Id): Session[C, A] = { + _CACHE.get(sessionId).asInstanceOf[Session[C, A]] } - def set[T](sessionId: Session[T]#Id, session: Session[T]): Unit = { - _CACHE = _CACHE.updated(sessionId, session.asInstanceOf[Session[Any]]) + def set[C, A](sessionId: Session[C, A]#Id, session: Session[C, A]): Unit = { + _CACHE = _CACHE.updated(sessionId, session.asInstanceOf[Session[Any, Any]]) } } \ No newline at end of file From 3c3c8801ee130ade3f0dc7e8956988156c861a7a Mon Sep 17 00:00:00 2001 From: giannislelekas Date: Thu, 21 Mar 2019 12:56:06 +0100 Subject: [PATCH 5/9] Fixing the build --- src/main/scala/chatapp/Chat.scala | 15 ++++++++------- src/main/scala/rover/Client.scala | 5 ++--- src/main/scala/rover/Server.scala | 13 ++++++++----- src/main/scala/rover/Session.scala | 9 +++++++++ src/main/scala/rover/rdo/AtomicObjectState.scala | 6 +++--- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/src/main/scala/chatapp/Chat.scala b/src/main/scala/chatapp/Chat.scala index f4796b5..a51c478 100644 --- a/src/main/scala/chatapp/Chat.scala +++ b/src/main/scala/chatapp/Chat.scala @@ -32,7 +32,7 @@ class ChatMessage(val body: String, // FIXME: ensure messages can be read, but not modified or reassigned... // FIXME: after state & rd object impl change -class Chat(_onStateModified: Chat#Updater, initialState: AtomicObjectState[List[ChatMessage]] = AtomicObjectState.initial(List[ChatMessage]())) extends RdObject[List[ChatMessage]](AtomicObjectState.initial(List[ChatMessage]())) { +class Chat(_onStateModified: Chat#Updater, initialState: AtomicObjectState[List[ChatMessage]]) extends RdObject[List[ChatMessage]](AtomicObjectState.initial(List[ChatMessage]())) { // type State = List[ChatMessage] type Updater = AtomicObjectState[List[ChatMessage]] => Future[Unit] @@ -60,12 +60,13 @@ object Chat { } } -class ChatServer() extends Server[OAuth2Credentials](null) { +//class ChatServer extends Server[OAuth2Credentials, List[ChatMessage]] { +// +//} -} - -class ChatClient(serverAddress: String) extends Client.OAuth2Client(serverAddress, (credentials: OAuth2Credentials) => credentials.accessToken) { - var session: Session[OAuth2Credentials] = null +class ChatClient(serverAddress: String) extends + Client.OAuth2Client[List[ChatMessage]](serverAddress, (credentials: OAuth2Credentials) => credentials.accessToken, Map[String, AtomicObjectState[List[ChatMessage]]]()) { + var session: Session[OAuth2Credentials, List[ChatMessage]] = null var user: User = null var chat: Chat = null; @@ -88,7 +89,7 @@ class ChatClient(serverAddress: String) extends Client.OAuth2Client(serverAddres val credentials = new OAuth2Credentials("fake credentials", "fake credentials") this.user = user session = createSession(credentials) - val rdo = await(session.importRDO[List[ChatMessage]]("chat")) + val rdo = new RdObject[List[ChatMessage]](await(session.importRDO("chat"))) chat = Chat.fromRDO(rdo, updater) } } diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala index 3bf4f5c..443fd4c 100644 --- a/src/main/scala/rover/Client.scala +++ b/src/main/scala/rover/Client.scala @@ -10,12 +10,11 @@ import rover.rdo.AtomicObjectState // } //} -class Client[C, A](private val serverAddress: String, private val id: C, private var mapToStates: Map[String, AtomicObjectState[A]]){ - private val identifier: Session[C, A]#Identifier = null +class Client[C, A](private val serverAddress: String, private val identifier: Session[C, A]#Identifier, private var mapToStates: Map[String, AtomicObjectState[A]]){ val server = Server.fromAddress[C,A](serverAddress) def createSession(credentials: C): Session[C, A] = { - server.createSession(credentials) + server.createSession(credentials, this.identifier) } def appended(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala index c4db691..6b31b61 100644 --- a/src/main/scala/rover/Server.scala +++ b/src/main/scala/rover/Server.scala @@ -3,15 +3,18 @@ package rover import rover.rdo.AtomicObjectState import rover.rdo.client.{CommonAncestor, RdObject} -class Server[C, A](identifier: Session[C, A]#Identifier, private val address: String, +class Server[C, A]( private val address: String, private val mapToClients: Map[C, Client[C, A]], private var mapToStates: Map[String, AtomicObjectState[A]]) { + + val credentials = null + // TODO: Determine what to do with this - def clientFromCredentials(credentials: C): Client[C, A] = { + def clientFromCredentials(credentials: Session[C,A]#Identifier): Client[C, A] = { new Client[C, A](this.address, credentials, Map[String, AtomicObjectState[A]]()) } - def createSession(credentials: C): Session[C, A] = { - new Session[C, A](credentials, this, clientFromCredentials(credentials)) + def createSession(credentials: C, identifier: Session[C,A]#Identifier): Session[C, A] = { + new Session[C, A](credentials, this, clientFromCredentials(identifier)) } def deliveredState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ @@ -35,6 +38,6 @@ class Server[C, A](identifier: Session[C, A]#Identifier, private val address: St object Server { def fromAddress[C, A](address: String): Server[C, A] = { - return new Server[C, A](null, address, Map[C, Client[C,A]](), Map[String, AtomicObjectState[A]]()) + return new Server[C, A](address, Map[C, Client[C,A]](), Map[String, AtomicObjectState[A]]()) } } \ No newline at end of file diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala index d3c2da3..a5249dc 100644 --- a/src/main/scala/rover/Session.scala +++ b/src/main/scala/rover/Session.scala @@ -18,6 +18,15 @@ class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) // val client: Client[C] = null // TODO: Implement errors + def importRDO(objectId: ObjectId): Future[AtomicObjectState[A]] = { + async{ + if (objectId == "chat"){ + AtomicObjectState.initial[A](List[Any]().asInstanceOf[A]) + } + else null + } + } + def importRDOwithState[A](objectId: ObjectId, stateId: String): Future[Unit] = { // TODO: Hacks async { diff --git a/src/main/scala/rover/rdo/AtomicObjectState.scala b/src/main/scala/rover/rdo/AtomicObjectState.scala index 843fd45..23b8aca 100644 --- a/src/main/scala/rover/rdo/AtomicObjectState.scala +++ b/src/main/scala/rover/rdo/AtomicObjectState.scala @@ -41,9 +41,9 @@ object AtomicObjectState { return new AtomicObjectState[A](value, Log.withInitialState(value)) } - def initial[A](value: List[A]): AtomicObjectState[A] = { - return new AtomicObjectState[A](value.last, Log.withInitialStates(value.map(v => LogRecord[A](v, null, v)))) - } +// def initial[A](value: List[A]): AtomicObjectState[A] = { +// return new AtomicObjectState[A](value.last, Log.withInitialStates(value.map(v => LogRecord[A](v, null, v)))) +// } def fromLog[A](log: Log[A]): AtomicObjectState[A] = { return new AtomicObjectState[A](log.asList.last.stateResult, log) From 1ba5480629b59df55e8fe02d5c853dba208ebb32 Mon Sep 17 00:00:00 2001 From: giannislelekas Date: Thu, 21 Mar 2019 22:07:55 +0100 Subject: [PATCH 6/9] small cleanup --- src/main/scala/rover/Client.scala | 11 ++--------- src/main/scala/rover/Server.scala | 22 ++++++++++++++-------- src/main/scala/rover/Session.scala | 4 +--- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala index 443fd4c..5b747b4 100644 --- a/src/main/scala/rover/Client.scala +++ b/src/main/scala/rover/Client.scala @@ -2,15 +2,8 @@ package rover import rover.rdo.AtomicObjectState -//class Client[C](serverAddress: String, identifier: Session[C]#Identifier) { -// val server = Server.fromAddress[C](serverAddress) -// -// def createSession(credentials: C) = { -// server.createSession(credentials) -// } -//} - -class Client[C, A](private val serverAddress: String, private val identifier: Session[C, A]#Identifier, private var mapToStates: Map[String, AtomicObjectState[A]]){ +class Client[C, A](private val serverAddress: String, private val identifier: Session[C, A]#Identifier, + private var mapToStates: Map[String, AtomicObjectState[A]]){ val server = Server.fromAddress[C,A](serverAddress) def createSession(credentials: C): Session[C, A] = { diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala index 6b31b61..e5f165f 100644 --- a/src/main/scala/rover/Server.scala +++ b/src/main/scala/rover/Server.scala @@ -3,10 +3,12 @@ package rover import rover.rdo.AtomicObjectState import rover.rdo.client.{CommonAncestor, RdObject} -class Server[C, A]( private val address: String, - private val mapToClients: Map[C, Client[C, A]], private var mapToStates: Map[String, AtomicObjectState[A]]) { - val credentials = null +class Server[C, A](private val address: String, private val mapToClients: Map[Session[C, A]#Identifier, Client[C, A]], + private var mapToStates: Map[String, AtomicObjectState[A]]) { + + //FIXME: Does the server has its own creds? It merely keeps track of clients' creds +// val credentials = null // TODO: Determine what to do with this def clientFromCredentials(credentials: Session[C,A]#Identifier): Client[C, A] = { @@ -17,14 +19,14 @@ class Server[C, A]( private val address: String, new Session[C, A](credentials, this, clientFromCredentials(identifier)) } - def deliveredState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ - this.mapToStates = this.mapToStates + (stateId -> atomicState) - } - def getAtomicStateWithId(stateId: String): AtomicObjectState[A] = { return mapToStates(stateId) } + def deliveredState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ + this.mapToStates = this.mapToStates + (stateId -> atomicState) + } + def receivedState(stateId: String, state: AtomicObjectState[A]): Unit ={ val clientRDO = new RdObject[A](state) val serverRDO = new RdObject[A](mapToStates(stateId)) @@ -38,6 +40,10 @@ class Server[C, A]( private val address: String, object Server { def fromAddress[C, A](address: String): Server[C, A] = { - return new Server[C, A](address, Map[C, Client[C,A]](), Map[String, AtomicObjectState[A]]()) + return new Server[C, A](address, Map[Session[C, A]#Identifier, Client[C,A]](), Map[String, AtomicObjectState[A]]()) + } + + def getMapOfServer[C, A](server: Server[C, A]): Map[String, AtomicObjectState[A]] ={ + return server.mapToStates } } \ No newline at end of file diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala index a5249dc..a5cc452 100644 --- a/src/main/scala/rover/Session.scala +++ b/src/main/scala/rover/Session.scala @@ -14,9 +14,7 @@ class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) // TODO: Move this to the proper place type ObjectId = Id -// val server: Server[C] = null -// val client: Client[C] = null - + // TODO: Implement errors def importRDO(objectId: ObjectId): Future[AtomicObjectState[A]] = { async{ From 48fb33637859296aaebe5ce96f884bbb6b123d74 Mon Sep 17 00:00:00 2001 From: giannislelekas Date: Thu, 21 Mar 2019 22:26:38 +0100 Subject: [PATCH 7/9] more cleanup and some comments added to client-server logic --- src/main/scala/rover/Client.scala | 10 +++++++++- src/main/scala/rover/Server.scala | 6 ++++++ src/main/scala/rover/Session.scala | 4 ++-- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala index 5b747b4..fac5e6a 100644 --- a/src/main/scala/rover/Client.scala +++ b/src/main/scala/rover/Client.scala @@ -2,6 +2,14 @@ package rover import rover.rdo.AtomicObjectState +/** + * Class encapsulating a Client, who interacts with a Server within a Session + * for exchanging RDOs (in fact their atomic states). + * @param serverAddress. the address of the corresponding server + * @param identifier, the access token granted to the client for authorized access to server + * @param mapToStates, map to up-to-date version of local RDOs + */ +//FIXME: create a unique, static id for each RDO upon its creation class Client[C, A](private val serverAddress: String, private val identifier: Session[C, A]#Identifier, private var mapToStates: Map[String, AtomicObjectState[A]]){ @@ -10,7 +18,7 @@ class Client[C, A](private val serverAddress: String, private val identifier: Se server.createSession(credentials, this.identifier) } - def appended(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ + def appendedState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ this.mapToStates = this.mapToStates + (stateId -> atomicState) } diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala index e5f165f..2064ad7 100644 --- a/src/main/scala/rover/Server.scala +++ b/src/main/scala/rover/Server.scala @@ -4,6 +4,12 @@ import rover.rdo.AtomicObjectState import rover.rdo.client.{CommonAncestor, RdObject} +/** + * Encapsulating the logic of the server. + * @param address of the server + * @param mapToClients, map to clients using as key the access token granted to the client + * @param mapToStates, map to stable (committed) states of RDOs +*/ class Server[C, A](private val address: String, private val mapToClients: Map[Session[C, A]#Identifier, Client[C, A]], private var mapToStates: Map[String, AtomicObjectState[A]]) { diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala index a5cc452..95a18c9 100644 --- a/src/main/scala/rover/Session.scala +++ b/src/main/scala/rover/Session.scala @@ -14,7 +14,7 @@ class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) // TODO: Move this to the proper place type ObjectId = Id - + // TODO: Implement errors def importRDO(objectId: ObjectId): Future[AtomicObjectState[A]] = { async{ @@ -30,7 +30,7 @@ class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) async { if (objectId == "chat") { val atomicState = server.getAtomicStateWithId(stateId) - client.appended(stateId, atomicState) + client.appendedState(stateId, atomicState) } else null } From 945c79b7a9c9a95362dc8046807958319554d545 Mon Sep 17 00:00:00 2001 From: Steffan Sluis Date: Thu, 21 Mar 2019 22:19:01 +0100 Subject: [PATCH 8/9] Http server with json encoding and decoding - [x] Basic updating of state (LRU) - [x] Client uses http api - [x] Pull initial client state from server - [x] Working state sync (LRU) --- build.gradle | 32 +++- src/main/scala/chatapp/CLI.scala | 25 ++++ src/main/scala/chatapp/Chat.scala | 135 +---------------- src/main/scala/chatapp/Client.scala | 140 ++++++++++++++++++ src/main/scala/chatapp/Message.scala | 38 +++++ src/main/scala/chatapp/Server.scala | 19 +++ src/main/scala/chatapp/User.scala | 29 ++++ src/main/scala/rover/Client.scala | 86 ++++++++++- src/main/scala/rover/Server.scala | 98 +++++++++++- src/main/scala/rover/Session.scala | 5 +- .../scala/rover/rdo/AtomicObjectState.scala | 46 +++++- 11 files changed, 511 insertions(+), 142 deletions(-) create mode 100644 src/main/scala/chatapp/CLI.scala create mode 100644 src/main/scala/chatapp/Client.scala create mode 100644 src/main/scala/chatapp/Message.scala create mode 100644 src/main/scala/chatapp/Server.scala create mode 100644 src/main/scala/chatapp/User.scala diff --git a/build.gradle b/build.gradle index 3c5a069..e93ca07 100644 --- a/build.gradle +++ b/build.gradle @@ -8,15 +8,32 @@ repositories { mavenCentral() jcenter() } +configurations { + scalaCompilerPlugin +} dependencies { implementation 'org.scala-lang:scala-library:2.12.8' implementation 'org.scala-lang.modules:scala-async_2.12:0.9.7' compile 'com.monovore:decline_2.12:0.6.1' compile 'org.typelevel:cats-core_2.12:1.5.0' + compile group: 'com.criteo.lolhttp', name: 'lolhttp_2.12', version: '10.0.0' + compile group: 'com.criteo.lolhttp', name: 'lolhtml_2.12', version: '10.0.0' + compile group: 'com.criteo.lolhttp', name: 'loljson_2.12', version: '10.0.0' + compile group: 'io.circe', name: 'circe-core_2.12', version: '0.11.1' + compile group: 'io.circe', name: 'circe-generic_2.12', version: '0.11.1' + compile group: 'io.circe', name: 'circe-optics_2.12', version: '0.11.0' testImplementation 'org.pegdown:pegdown:1.4.2' testImplementation 'org.scalatest:scalatest_2.12:3.0.6' testImplementation 'junit:junit:4.12' + + scalaCompilerPlugin "org.scalamacros:paradise_2.12.8:2.1.0" +} + +tasks.withType(ScalaCompile) { + scalaCompileOptions.additionalParameters = [ + "-Xplugin:" + configurations.scalaCompilerPlugin.asPath + ] } task wrapper(type: Wrapper) { @@ -30,7 +47,20 @@ idea{ } task run(type: JavaExec, dependsOn: classes) { - main = 'chatapp.Chat' + main = 'chatapp.ChatClient' standardInput = System.in classpath sourceSets.main.runtimeClasspath } + +compileJava { + options.compilerArgs += [ + "--add-modules", "java.se", + "--add-exports", "java.base/jdk.internal.ref=ALL-UNNAMED", + "--add-opens", "java.base/java.lang=ALL-UNNAMED", + "--add-opens", "java.base/java.nio=ALL-UNNAMED", + "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens", "java.management/sun.management=ALL-UNNAMED", + "--add-opens", "jdk.management/com.sun.management.internal=ALL-UNNAMED" + ] +} + diff --git a/src/main/scala/chatapp/CLI.scala b/src/main/scala/chatapp/CLI.scala new file mode 100644 index 0000000..73724ee --- /dev/null +++ b/src/main/scala/chatapp/CLI.scala @@ -0,0 +1,25 @@ +package chatapp + +import cats.implicits._ +import com.monovore.decline._ + +class ChatCLI { + +} + +// TODO: Add client and server subcommands +object ChatCLI extends CommandApp( + name = "rover-chat", + header = "Says hello!", + main = { + val userOpt = + Opts.option[String]("target", help = "Person to greet.") .withDefault("world") + + val quietOpt = Opts.flag("quiet", help = "Whether to be quiet.").orFalse + + (userOpt, quietOpt).mapN { (user, quiet) => + if (quiet) println("...") + else println(s"Hello $user!") + } + } +) diff --git a/src/main/scala/chatapp/Chat.scala b/src/main/scala/chatapp/Chat.scala index a51c478..c400834 100644 --- a/src/main/scala/chatapp/Chat.scala +++ b/src/main/scala/chatapp/Chat.scala @@ -2,44 +2,23 @@ package chatapp import rover.rdo.AtomicObjectState import rover.rdo.client.RdObject -import cats.implicits._ -import com.monovore.decline._ -import rover.Client.OAuth2Credentials -import rover.{Client, Server, Session} import scala.concurrent.ExecutionContext.Implicits.global import scala.async.Async.{async, await} -import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ -class User(val username: String) { - -} - -object User { - val Steffan = new User("steffan") - val Giannis = new User("giannis") -} - -class ChatMessage(val body: String, - val author: User, - val timestamp: Long = java.time.Instant.now.getEpochSecond()) { - - override def toString: String = { - s"${author.username}: $body" - } -} // FIXME: ensure messages can be read, but not modified or reassigned... // FIXME: after state & rd object impl change -class Chat(_onStateModified: Chat#Updater, initialState: AtomicObjectState[List[ChatMessage]]) extends RdObject[List[ChatMessage]](AtomicObjectState.initial(List[ChatMessage]())) { +class Chat(_onStateModified: Chat#Updater, _initialState: AtomicObjectState[List[ChatMessage]]) extends RdObject[List[ChatMessage]](_initialState) { // type State = List[ChatMessage] type Updater = AtomicObjectState[List[ChatMessage]] => Future[Unit] - def send(message: ChatMessage): Promise[Unit]= { + def send(message: ChatMessage): Future[Unit]= { val op: AtomicObjectState[List[ChatMessage]]#Op = s => s :+ message - Promise() completeWith async { + async { modifyState(op) } } @@ -60,110 +39,4 @@ object Chat { } } -//class ChatServer extends Server[OAuth2Credentials, List[ChatMessage]] { -// -//} - -class ChatClient(serverAddress: String) extends - Client.OAuth2Client[List[ChatMessage]](serverAddress, (credentials: OAuth2Credentials) => credentials.accessToken, Map[String, AtomicObjectState[List[ChatMessage]]]()) { - var session: Session[OAuth2Credentials, List[ChatMessage]] = null - var user: User = null - var chat: Chat = null; - - val printer = (string: String) => { - val cls = s"${string.split("\n").map(c => s"${REPL.UP}${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_LINE_AFTER}").mkString("")}" - // Prepend two spaces to match input indentation of "> " - val text = string.split("\n").map(line => s" $line").mkString("\n") - - s"${REPL.SAVE_CURSOR}$cls\r$text${REPL.RESTORE_CURSOR}" - } - - // TODO: This is hacky, figure out a better way to do this - val updater: Chat#Updater = state => async { - val text = state.immutableState.takeRight(ChatClient.SIZE).map(m => m.toString()).mkString("\n") - print(s"${printer(text)}") - } - - def login(user: User): Future[Unit] = { - async { - val credentials = new OAuth2Credentials("fake credentials", "fake credentials") - this.user = user - session = createSession(credentials) - val rdo = new RdObject[List[ChatMessage]](await(session.importRDO("chat"))) - chat = Chat.fromRDO(rdo, updater) - } - } - - def render(): Future[Unit] = { -// val chat = new Chat(updater) - println(" Welcome to Rover Chat!") - print((1 to ChatClient.SIZE).map(i => "\n").mkString("")) - - // Simulate conversation - async { - Thread.sleep(3000) - await(chat.send(new ChatMessage("Hey man!", User.Giannis)).future) - updater(chat.state) - - Thread.sleep(3000) - await(chat.send(new ChatMessage("How's it going?", User.Giannis)).future) - updater(chat.state) - - Thread.sleep(10000) - await(chat.send(new ChatMessage("Yea man I'm good", User.Giannis)).future) - updater(chat.state) - } - - val reader = () => { - print("> ") - val s = scala.io.StdIn.readLine() - s - } - val executor = (input: String) => { - async { - val p = chat.send(new ChatMessage(input, user)) - await(p.future) - // This clears the input line - print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}") - chat.state.immutableState.takeRight(ChatClient.SIZE).map(m => s"${m.toString()}").mkString("\n") - } - } - val repl: REPL[String] = new REPL(reader, executor, printer) -// Await.result(repl.loop(), Duration.Inf) - repl.loop() - } - -} - -object ChatClient { - val SIZE = 10 - - def main(args: Array[String]): Unit = { - val serverAddress = "bla" - val client = new ChatClient(serverAddress) - val f = async { - await(client.login(User.Steffan)) - await(client.render()) - } - - Await.result(f, Duration.Inf) - } -} - - -// TODO: Add client and server subcommands -object ChatCLI extends CommandApp( - name = "rover-chat", - header = "Says hello!", - main = { - val userOpt = - Opts.option[String]("target", help = "Person to greet.") .withDefault("world") - - val quietOpt = Opts.flag("quiet", help = "Whether to be quiet.").orFalse - (userOpt, quietOpt).mapN { (user, quiet) => - if (quiet) println("...") - else println(s"Hello $user!") - } - } -) diff --git a/src/main/scala/chatapp/Client.scala b/src/main/scala/chatapp/Client.scala new file mode 100644 index 0000000..cf67615 --- /dev/null +++ b/src/main/scala/chatapp/Client.scala @@ -0,0 +1,140 @@ +package chatapp + +import rover.Client.OAuth2Credentials +import rover.rdo.AtomicObjectState +import rover.rdo.client.RdObject +import rover.{Client, HTTPClient, Session} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} +import scala.concurrent.{Await, Future, Promise} +import scala.concurrent.duration._ + +class ChatClient(serverAddress: String) extends + HTTPClient[List[ChatMessage]](serverAddress, (credentials) => credentials.accessToken) { +// Client.OAuth2Client[List[ChatMessage]](serverAddress, (credentials: OAuth2Credentials) => credentials.accessToken, Map[String, AtomicObjectState[List[ChatMessage]]]()) { + var session: Session[OAuth2Credentials, List[ChatMessage]] = null + var user: ChatUser = null + var chat: Chat = null; + + val printer = (string: String) => { + val cls = s"${string.split("\n").map(c => s"${REPL.UP}${REPL.ERASE_LINE_BEFORE}${REPL.ERASE_LINE_AFTER}").mkString("")}" + // Prepend two spaces to match input indentation of "> " + val text = string.split("\n").map(line => s" $line").mkString("\n") + + s"${REPL.SAVE_CURSOR}$cls\r$text${REPL.RESTORE_CURSOR}" + } + + // TODO: This is hacky, figure out a better way to do this + val updater: Chat#Updater = state => async { + val text = state.immutableState.takeRight(ChatClient.SIZE).map(m => m.toString()).mkString("\n") + print(s"${printer(text)}") + } + + def login(user: ChatUser): Future[Unit] = { + async { + val credentials = new OAuth2Credentials("fake credentials", "fake credentials") + this.user = user + session = createSession(credentials) + val state = importRDO("chat") + val rdo = new RdObject[List[ChatMessage]](state) + chat = Chat.fromRDO(rdo, updater) +// println(s"Initial state: ${chat.state}") + } + } + + def send(message: String): Future[Unit] = { +// println(s"Sending message with intial state: ${chat.state}") + async { + await(chat.send(new ChatMessage(message, user))) + exportRDO("chat", chat.state) + // await(session.exportRDO("chat", chat)) +// updater(chat.state) + } + } + + def updateLoop(): Future[Unit] = { + async { + while(true) { +// val serverState = await(importRDO("chat")).asInstanceOf[RdObject[List[ChatMessage]]] +// chat = Chat.fromRDO(serverState, updater) +// println(s"Updating state from update loop...") + val state = importRDO("chat") +// println(s"Got updated state: $state") + // chat = Chat.fromRDO(rdo, updater) + + appendedState("chat", state) + chat.state = state + +// println(s"Rendering chat state: ${chat.state}") + updater(chat.state) // Force re-render + Thread.sleep(ChatClient.UPDATE_DELAY_MS) + } + } + } + + def render(): Future[Unit] = { +// val chat = new Chat(updater) + println(s" Welcome to Rover Chat! Connected to: $serverAddress") + print((1 to ChatClient.SIZE).map(i => "\n").mkString("")) + + val reader = () => { + print("> ") + val s = scala.io.StdIn.readLine() + s + } + val executor = (input: String) => { + async { + val p = send(input) + await(p) + // This clears the input line + print(s"${REPL.UP}${REPL.ERASE_LINE_AFTER}") + chat.state.immutableState.takeRight(ChatClient.SIZE).map(m => s"${m.toString()}").mkString("\n") + } + } + val repl: REPL[String] = new REPL(reader, executor, printer) +// Await.result(repl.loop(), Duration.Inf) + updateLoop() // TODO: Memory leak + repl.loop() + } + +} + +object ChatClient { + val SIZE = 10 + val UPDATE_DELAY_MS = 2000 + + def main(args: Array[String]): Unit = { + val serverAddress = "localhost" + val client = new ChatClient(serverAddress) + val f = async { + await(client.login(ChatUser.Steffan)) + await(client.render()) + } + + Await.result(f, Duration.Inf) + } +} + +object Bot { + def main(args: Array[String]): Unit = { + val serverAddress = "localhost" + val client = new ChatClient(serverAddress) + val f = async { + await(client.login(ChatUser.Giannis)) + client.updateLoop() // TODO: Memory leak + + // Simulate conversation + Thread.sleep(3000) + await(client.send("Hey man!")) + + Thread.sleep(3000) + await(client.send("How's it going?")) + + Thread.sleep(10000) + await(client.send("Yea man I'm good")) + } + + Await.result(f, Duration.Inf) + } +} \ No newline at end of file diff --git a/src/main/scala/chatapp/Message.scala b/src/main/scala/chatapp/Message.scala new file mode 100644 index 0000000..661f6b7 --- /dev/null +++ b/src/main/scala/chatapp/Message.scala @@ -0,0 +1,38 @@ +package chatapp +import io.circe._, io.circe.syntax._ +import io.circe.{Encoder, Json} + + +class ChatMessage(val body: String, + val author: ChatUser, + val timestamp: Long = java.time.Instant.now.getEpochSecond()) { + + // def encoded: Json = Encoder.forProduct2("author", "body")(m => m.body, m.author.username) + + override def toString: String = { + s"${author.username}: $body" + } +} + +object ChatMessage { + implicit val encodeChatMessage: Encoder[ChatMessage] = new Encoder[ChatMessage] { + final def apply(m: ChatMessage): Json = Json.obj( + ("author", m.author.asJson), +// ("author", Json.fromString(m.author.username)), + ("body", Json.fromString(m.body)), + ("timestamp", Json.fromLong(m.timestamp)) + ) + } + + implicit val decodeAtomicObjectState: Decoder[ChatMessage] = new Decoder[ChatMessage] { + final def apply(c: HCursor): Decoder.Result[ChatMessage] = + for { + author <- c.downField("author").as[ChatUser] + body <- c.downField("body").as[String] + timestamp <- c.downField("timestamp").as[Long] + } yield { + new ChatMessage(body, author) + } + } + +} diff --git a/src/main/scala/chatapp/Server.scala b/src/main/scala/chatapp/Server.scala new file mode 100644 index 0000000..7b0d7ff --- /dev/null +++ b/src/main/scala/chatapp/Server.scala @@ -0,0 +1,19 @@ +package chatapp + +import rover.Client.OAuth2Credentials +import rover.rdo.AtomicObjectState +import rover.{Client, Server, Session, HTTPServer} + + +class ChatServer extends HTTPServer[List[ChatMessage]](_mapToStates = Map("chat" -> ChatServer.CHAT_STATE)) { + +} + +object ChatServer { + val CHAT_STATE = AtomicObjectState.initial(List[ChatMessage](new ChatMessage("test", ChatUser.Steffan))) + + def main(args: Array[String]): Unit = { + val server = new ChatServer + server.start() + } +} diff --git a/src/main/scala/chatapp/User.scala b/src/main/scala/chatapp/User.scala new file mode 100644 index 0000000..f5d34be --- /dev/null +++ b/src/main/scala/chatapp/User.scala @@ -0,0 +1,29 @@ +package chatapp + + +import io.circe._, io.circe.syntax._ +import io.circe.{Encoder, Json} + +class ChatUser(val username: String) { +} + +object ChatUser { + val Steffan = new ChatUser("steffan") + val Giannis = new ChatUser("giannis") + + implicit val encodeUser: Encoder[ChatUser] = new Encoder[ChatUser] { + final def apply(u: ChatUser): Json = Json.obj( + ("username", Json.fromString(u.username)) + ) + } + + implicit val decodeAtomicObjectState: Decoder[ChatUser] = new Decoder[ChatUser] { + final def apply(c: HCursor): Decoder.Result[ChatUser] = + for { + username <- c.downField("username").as[String] + } yield { + new ChatUser(username) + } + } + +} \ No newline at end of file diff --git a/src/main/scala/rover/Client.scala b/src/main/scala/rover/Client.scala index fac5e6a..9f1d7bf 100644 --- a/src/main/scala/rover/Client.scala +++ b/src/main/scala/rover/Client.scala @@ -1,7 +1,20 @@ package rover +import lol.http._ +import lol.json._ + +import rover.Client.OAuth2Credentials import rover.rdo.AtomicObjectState +import cats.implicits._ +import cats.effect.{ IO } + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import scala.async.Async.{async, await} + +import io.circe._ + /** * Class encapsulating a Client, who interacts with a Server within a Session * for exchanging RDOs (in fact their atomic states). @@ -10,8 +23,9 @@ import rover.rdo.AtomicObjectState * @param mapToStates, map to up-to-date version of local RDOs */ //FIXME: create a unique, static id for each RDO upon its creation -class Client[C, A](private val serverAddress: String, private val identifier: Session[C, A]#Identifier, - private var mapToStates: Map[String, AtomicObjectState[A]]){ +class Client[C, A](protected val serverAddress: String, protected val identifier: Session[C, A]#Identifier, + protected var mapToStates: Map[String, AtomicObjectState[A]] = Map[String, AtomicObjectState[A]]()){ + val server = Server.fromAddress[C,A](serverAddress) def createSession(credentials: C): Session[C, A] = { @@ -36,3 +50,71 @@ object Client { null } } + +class HTTPClient[A](_serverAddress: String, _identifier: Session[OAuth2Credentials, A]#Identifier)(implicit val encodeA: Encoder[A], implicit val decodeA: Decoder[A]) extends Client[OAuth2Credentials, A](_serverAddress, _identifier) { + + implicit val stateEncoder: Encoder[AtomicObjectState[A]] = Encoder.forProduct1("immutableState")(rdo => (rdo.immutableState)) + // implicit val stateDecoder: Decoder[AtomicObjectState[List[ChatMessage]]] = deriveDecoder[AtomicObjectState[List[ChatMessage]]] +// implicit val rdoEncoder: Encoder[RdObject[A]] = Encoder.forProduct1("state")(rdo => (rdo.state)) + + implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { + final def apply(c: HCursor): Decoder.Result[AtomicObjectState[A]] = + for { + immutableState <- c.downField("immutableState").as[A] + } yield { + AtomicObjectState.initial(immutableState) + } + } + + def importRDO(objectId: String): AtomicObjectState[A] = { + val roverClient = lol.http.Client(serverAddress, 8888, "http") + val userAgent = h"User-Agent" -> h"lolhttp" + + val getState = (for { + result <- roverClient.run(Get(s"/api/rdo/$objectId").addHeaders(userAgent)) { + _.readSuccessAs[Json].map(json => { + json.as[AtomicObjectState[A]] + }) + } + + state <- IO { + result.right.get + } + _ <- roverClient.stop() + + } yield (state)).onError { case _ => roverClient.stop() } + + + getState.unsafeRunSync + + } + + def exportRDO(objectId: String, state: AtomicObjectState[A]): Unit = { + val roverClient = lol.http.Client(serverAddress, 8888, "http") + val userAgent = h"User-Agent" -> h"lolhttp" + +// println(s"Exporting RDO $objectId as $state") + val setState = (for { + result <- roverClient.run(Post(s"/api/rdo/$objectId", stateEncoder.apply(state).toString).addHeaders(userAgent)) { + _.readSuccessAs[Json].map(json => { +// println(s"Received JSON response: $json") +// json.as[AtomicObjectState[A]] + }) + } + +// state <- IO { +// if (result.isLeft) { +// println(s"${result.left.get}") +// } +// result.right.get +// } + _ <- roverClient.stop() + + } yield (state)).onError { case _ => roverClient.stop() } + + + setState.unsafeRunSync + + } + +} \ No newline at end of file diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala index 2064ad7..f1b8322 100644 --- a/src/main/scala/rover/Server.scala +++ b/src/main/scala/rover/Server.scala @@ -1,17 +1,27 @@ package rover +import rover.Client.OAuth2Credentials import rover.rdo.AtomicObjectState import rover.rdo.client.{CommonAncestor, RdObject} +import lol.http._ +import lol.json._ +import scala.concurrent.ExecutionContext.Implicits.global +import cats.effect.IO +import chatapp.ChatMessage +import io.circe._ +import io.circe.syntax._ +import io.circe.generic.semiauto._ +import scala.util.Try /** * Encapsulating the logic of the server. * @param address of the server * @param mapToClients, map to clients using as key the access token granted to the client * @param mapToStates, map to stable (committed) states of RDOs */ -class Server[C, A](private val address: String, private val mapToClients: Map[Session[C, A]#Identifier, Client[C, A]], - private var mapToStates: Map[String, AtomicObjectState[A]]) { +class Server[C, A]( protected val address: String, + protected val mapToClients: Map[Session[C, A]#Identifier, Client[C, A]], protected var mapToStates: Map[String, AtomicObjectState[A]]) { //FIXME: Does the server has its own creds? It merely keeps track of clients' creds // val credentials = null @@ -26,7 +36,7 @@ class Server[C, A](private val address: String, private val mapToClients: Map[Se } def getAtomicStateWithId(stateId: String): AtomicObjectState[A] = { - return mapToStates(stateId) + return mapToStates(stateId) } def deliveredState(stateId: String, atomicState: AtomicObjectState[A]): Unit ={ @@ -44,7 +54,89 @@ class Server[C, A](private val address: String, private val mapToClients: Map[Se } } +class HTTPServer[A]( + val port: Int = 8888, + _mapToClients: Map[Session[OAuth2Credentials, A]#Identifier, Client[OAuth2Credentials, A]] = Map[Session[OAuth2Credentials, A]#Identifier, Client[OAuth2Credentials, A]](), + _mapToStates: Map[String, AtomicObjectState[A]] = Map[String, AtomicObjectState[A]]() +)(implicit val encodeA: Encoder[A], implicit val decodeA: Decoder[A]) extends Server[OAuth2Credentials, A]("bla", _mapToClients, _mapToStates) { + def Error(msg: String): Json = Json.obj("error" -> Json.fromString(msg)) + + implicit val stateEncoder: Encoder[AtomicObjectState[A]] = Encoder.forProduct1("immutableState")(rdo => (rdo.immutableState)) +// implicit val stateDecoder: Decoder[AtomicObjectState[List[ChatMessage]]] = deriveDecoder[AtomicObjectState[List[ChatMessage]]] + implicit val rdoEncoder: Encoder[RdObject[A]] = Encoder.forProduct1("state")(rdo => (rdo.state)) + + implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { + final def apply(c: HCursor): Decoder.Result[AtomicObjectState[A]] = + for { + immutableState <- c.downField("immutableState").as[A] + } yield { + AtomicObjectState.initial(immutableState) + } + } + +// val mapToState = _mapToStates; + + lazy val Hello: PartialService = { + case GET at "/hello" => + Ok("Hello World, from Rover!") +// case _ => +// NotFound + } + + lazy val Api: PartialService = { + // Nothing special here, but look how we handle the 404 case. + case GET at url"/api/rdo/$id" => + println(mapToStates) + println(s"Getting $id from $mapToStates") + IO { + Try(id).toOption.flatMap(mapToStates.get).map { state => + Ok(state.asJson) + }.getOrElse { + NotFound(Error(s"No rdo found for id: `$id'")) + } + } + + case request @ POST at url"/api/rdo/$id" => + println(s"Handling POST request for $id") + Try(id).toOption.flatMap(mapToStates.get).map { state => + // TODO: Decide what to return here +// val existing: RdObject[A] = new RdObject[A](state) + request.readAs[Json].map { jsonBody => + println(s"Request string: ${jsonBody.toString()}") + val updatedState = jsonBody.as[AtomicObjectState[A]] + if (updatedState.isLeft) { + val failure = updatedState.left.get + InternalServerError(Error(s"${failure.message}: \n${failure.history.mkString("\n")}")) + } else { + val updated = updatedState.right.get + deliveredState(id, updated) + Ok(new RdObject(updated).asJson) + } + // val updatedRDO: = state.copy( +// text = root.text.string.getOption(jsonBody).getOrElse(state.text), +// done = root.done.boolean.getOption(jsonBody).getOrElse(state.done) +// ) + } + }.getOrElse { + NotFound(Error(s"No rdo found for id: `$id'")) + } + + } + + def start(): Unit = { +// val + println(s"Starting server at port $port") + lol.http.Server.listen(port)(Hello.orElse(Api).orElse { case _ => NotFound }) + +// async { +// while(true) {} +// } + } + +} + object Server { +// val CHAT_STATE = AtomicObjectState.initial(List[Any]()) def fromAddress[C, A](address: String): Server[C, A] = { return new Server[C, A](address, Map[Session[C, A]#Identifier, Client[C,A]](), Map[String, AtomicObjectState[A]]()) } diff --git a/src/main/scala/rover/Session.scala b/src/main/scala/rover/Session.scala index 95a18c9..186ff77 100644 --- a/src/main/scala/rover/Session.scala +++ b/src/main/scala/rover/Session.scala @@ -1,5 +1,6 @@ package rover +import chatapp.ChatServer import rover.rdo.AtomicObjectState import rover.rdo.client.RdObject @@ -19,7 +20,7 @@ class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) def importRDO(objectId: ObjectId): Future[AtomicObjectState[A]] = { async{ if (objectId == "chat"){ - AtomicObjectState.initial[A](List[Any]().asInstanceOf[A]) + ChatServer.CHAT_STATE.asInstanceOf[AtomicObjectState[A]] } else null } @@ -32,7 +33,7 @@ class Session[C, A](credentials: C, server: Server[C, A], client: Client[C, A]) val atomicState = server.getAtomicStateWithId(stateId) client.appendedState(stateId, atomicState) } - else null +// else null } } diff --git a/src/main/scala/rover/rdo/AtomicObjectState.scala b/src/main/scala/rover/rdo/AtomicObjectState.scala index 23b8aca..9a0a0c3 100644 --- a/src/main/scala/rover/rdo/AtomicObjectState.scala +++ b/src/main/scala/rover/rdo/AtomicObjectState.scala @@ -2,10 +2,17 @@ package rover.rdo import rover.rdo.client.{Log, LogRecord} +import io.circe._, io.circe.syntax._ +import io.circe.{Encoder, Decoder, Json} + + // TODO: make ctor private -class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A]) extends ObjectState { +class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A])(implicit val encodeA: Encoder[A], implicit val decodeA: Decoder[A]) extends ObjectState { type Op = A => A + def encoded: Json = encodeA(value) +// def decoded: A = decodeA() + def immutableState: A = value def applyOp(operation: Op): AtomicObjectState[A] = { @@ -34,10 +41,27 @@ class AtomicObjectState[A](private val value: A, private[rdo] val log: Log[A]) e override def toString: String = { value.toString } + + implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { + final def apply(c: HCursor): Decoder.Result[AtomicObjectState[A]] = + for { + immutableState <- c.downField("immutableState").as[A] + } yield { + AtomicObjectState.initial(immutableState) + } + } +// implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[A]] = new Decoder[AtomicObjectState[A]] { +// final def apply(c: HCursor)(implicit encodeA: Encoder[A], decodeA: Decoder[A]): Decoder.Result[AtomicObjectState[A]] = +// for { +// immutableState <- c.downField("immutableState").as[A] +// } yield { +// AtomicObjectState.initial(immutableState) +// } +// } } object AtomicObjectState { - def initial[A](value: A): AtomicObjectState[A] = { + def initial[A](value: A)(implicit encodeA: Encoder[A], decodeA: Decoder[A]) : AtomicObjectState[A] = { return new AtomicObjectState[A](value, Log.withInitialState(value)) } @@ -45,7 +69,23 @@ object AtomicObjectState { // return new AtomicObjectState[A](value.last, Log.withInitialStates(value.map(v => LogRecord[A](v, null, v)))) // } - def fromLog[A](log: Log[A]): AtomicObjectState[A] = { + def fromLog[A](log: Log[A])(implicit encodeA: Encoder[A], decodeA: Decoder[A]): AtomicObjectState[A] = { return new AtomicObjectState[A](log.asList.last.stateResult, log) } + +// implicit val encodeAtomicObjectState: Encoder[AtomicObjectState[_]] = new Encoder[AtomicObjectState[_]] { +// final def apply(s: AtomicObjectState[_]): Json = Json.obj( +// ("", s.asJson) +// ) +// } + +// implicit val decodeAtomicObjectState: Decoder[AtomicObjectState[_]] = new Decoder[AtomicObjectState[_]] { +// final def apply[A](c: HCursor)(implicit encodeA: Encoder[A], decodeA: Decoder[A]): Decoder.Result[AtomicObjectState[A]] = +// for { +// immutableState <- c.downField("immutableState").as[A] +// } yield { +// AtomicObjectState.initial(immutableState) +// } +// } + } \ No newline at end of file From 50c8fd21daa9c3404e3121d05372e75acc5acc81 Mon Sep 17 00:00:00 2001 From: Steffan Sluis Date: Fri, 22 Mar 2019 16:26:32 +0100 Subject: [PATCH 9/9] Auth stub --- build.gradle | 1 + src/main/scala/rover/Server.scala | 189 +++++++++++++++++++++++++++++- 2 files changed, 187 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index e93ca07..a9e5220 100644 --- a/build.gradle +++ b/build.gradle @@ -23,6 +23,7 @@ dependencies { compile group: 'io.circe', name: 'circe-core_2.12', version: '0.11.1' compile group: 'io.circe', name: 'circe-generic_2.12', version: '0.11.1' compile group: 'io.circe', name: 'circe-optics_2.12', version: '0.11.0' + compile group: 'com.nulab-inc', name: 'scala-oauth2-core_2.12', version: '1.3.0' testImplementation 'org.pegdown:pegdown:1.4.2' testImplementation 'org.scalatest:scalatest_2.12:3.0.6' testImplementation 'junit:junit:4.12' diff --git a/src/main/scala/rover/Server.scala b/src/main/scala/rover/Server.scala index f1b8322..fa278a8 100644 --- a/src/main/scala/rover/Server.scala +++ b/src/main/scala/rover/Server.scala @@ -1,5 +1,8 @@ package rover +import java.security.Timestamp +import java.util.Date + import rover.Client.OAuth2Credentials import rover.rdo.AtomicObjectState import rover.rdo.client.{CommonAncestor, RdObject} @@ -8,12 +11,17 @@ import lol.json._ import scala.concurrent.ExecutionContext.Implicits.global import cats.effect.IO -import chatapp.ChatMessage +import chatapp.{ChatMessage, ChatUser} import io.circe._ import io.circe.syntax._ import io.circe.generic.semiauto._ +import scalaoauth2.provider.{AuthInfo, AuthorizationRequest, ClientCredential, DataHandler, Password, TokenEndpoint} import scala.util.Try +import scala.async.Async.{async, await} +import scala.concurrent.Future + + /** * Encapsulating the logic of the server. * @param address of the server @@ -83,6 +91,60 @@ class HTTPServer[A]( // NotFound } + val oauthDataHandler = new OAuthDataHandler() + lazy val OAuthApi: PartialService = { + // Nothing special here, but look how we handle the 404 case. + case GET at "/auth" => + println(s"Handling auth!") + IO.fromFuture[Response] { + IO { + async { + val request = new AuthorizationRequest( + Map("Authorization" -> Seq("Basic Y2xpZW50X2lkX3ZhbHVlOmNsaWVudF9zZWNyZXRfdmFsdWU=")), + Map("grant_type" -> Seq("password"), "username" -> Seq("steffan"), "password" -> Seq("pass"), "scope" -> Seq("all"))) + + // val dataHandler = + val f = new TokenEndpoint { + override val handlers = Map( + "password" -> new Password()) + }.handleRequest(request, oauthDataHandler) + + // async { + await(f) match { + case Left(x) => InternalServerError(Error(x.description)) + case Right(x) => Ok(x.accessToken) + // } + + // whenReady(f) { result => result should be('right) } + } } + } + } + +// IO { +// val request = new AuthorizationRequest( +// Map("Authorization" -> Seq("Basic Y2xpZW50X2lkX3ZhbHVlOmNsaWVudF9zZWNyZXRfdmFsdWU=")), +// Map("grant_type" -> Seq("password"), "username" -> Seq("steffan"), "password" -> Seq("pass"), "scope" -> Seq("all"))) +// +//// val dataHandler = +// val f = new TokenEndpoint { +// override val handlers = Map( +// "password" -> new Password()) +// }.handleRequest(request, oauthDataHandler) +// +//// async { +// await(f) match { +// case Left(x) => InternalServerError(Error(x.description)) +// case Right(x) => Ok(x.accessToken) +//// } +// +// // whenReady(f) { result => result should be('right) } +// } +// +// Ok("Test") +// } + + } + lazy val Api: PartialService = { // Nothing special here, but look how we handle the 404 case. case GET at url"/api/rdo/$id" => @@ -126,13 +188,12 @@ class HTTPServer[A]( def start(): Unit = { // val println(s"Starting server at port $port") - lol.http.Server.listen(port)(Hello.orElse(Api).orElse { case _ => NotFound }) + lol.http.Server.listen(port)(Hello.orElse(OAuthApi.orElse(Api)).orElse { case _ => NotFound }) // async { // while(true) {} // } } - } object Server { @@ -144,4 +205,126 @@ object Server { def getMapOfServer[C, A](server: Server[C, A]): Map[String, AtomicObjectState[A]] ={ return server.mapToStates } +} + +object Crypto { + def generateToken(): String = { + return "token" + } +} + +class OAuthDataHandler[User](identifier: OAuthDataHandler[User]#UserIdentifier = (user: User) => user.toString) extends DataHandler[User] { + type ID = String + type UserIdentifier = User => ID + + val _user: User = ChatUser.Steffan.asInstanceOf[User] + val _access_token: scalaoauth2.provider.AccessToken = scalaoauth2.provider.AccessToken("access_token!", Some("refresh_token"), Some("scope"), Some(3600.toLong), new Date()) + val _auth_info: scalaoauth2.provider.AuthInfo[User] = AuthInfo[User](_user, Some("chat"), Some("scope"), Some("")) + + override def validateClient(maybeCredential: Option[ClientCredential], request: AuthorizationRequest): Future[Boolean] = { + async { true } + } + + def deleteAuthCode(code: String): scala.concurrent.Future[Unit] = { + async { + + } + } + + def findUser(maybeCredential: Option[scalaoauth2.provider.ClientCredential],request: scalaoauth2.provider.AuthorizationRequest): scala.concurrent.Future[Option[User]] = { + async { + Some(_user) + } + } + +// def validateClient(clientId: String, clientSecret: String, grantType: String): Boolean = { +// true +// // Clients.validate(clientId, clientSecret, grantType) +// } + +// override def findUser(maybeCredential: Option[ClientCredential], request: AuthorizationRequest): Future[Option[User]] = + def findUser(username: String, password: String): Future[Option[User]] = { + async { + Some { + if (username == "steffan") ChatUser.Steffan.asInstanceOf[User] + else if (username == "giannis") ChatUser.Giannis.asInstanceOf[User] + else new ChatUser(username).asInstanceOf[User] + } + } + // Users.findUser(username, password) + } + + def createAccessToken(authInfo: AuthInfo[User]): Future[scalaoauth2.provider.AccessToken] = { +// val accessTokenExpiresIn = 60 * 60 // 1 hour +// val now = new Date() +// val createdAt = new Timestamp(now.getTime) +// val refreshToken = Some(Crypto.generateToken()) +// val accessToken = Crypto.generateToken() +// val userId = identifier(authInfo.user) + + async { + _access_token + } + +// val tokenObject = models.AccessToken(accessToken, refreshToken, userId, authInfo.scope, accessTokenExpiresIn, createdAt, authInfo.clientId) +// AccessTokens.deleteExistingAndCreate(tokenObject, userId, authInfo.clientId) +// scalaoauth2.provider.AccessToken(accessToken, refreshToken, authInfo.scope, Some(accessTokenExpiresIn.toLong), now) + } + + def getStoredAccessToken(authInfo: AuthInfo[User]): Future[Option[scalaoauth2.provider.AccessToken]] = { + val userId = identifier(authInfo.user) + async { + Some(_access_token) + } + // AccessTokens.findToken(userId, authInfo.clientId) map { a => +// scalaoauth2.provider.AccessToken(a.accessToken, a.refreshToken, a.scope, Some(a.expiresIn.toLong), a.createdAt) +// } + } + + def refreshAccessToken(authInfo: AuthInfo[User], refreshToken: String): Future[scalaoauth2.provider.AccessToken] = { +// createAccessToken(authInfo) + async { + _access_token + } + } + + def findClientUser(clientId: String, clientSecret: String, scope: Option[String]): Option[User] = { + None // Not implemented yet + } + + def findAccessToken(token: String): Future[Option[scalaoauth2.provider.AccessToken]] = { + async { Some(_access_token) } + // AccessTokens.findAccessToken(token) map { a => +// scalaoauth2.provider.AccessToken(a.accessToken, a.refreshToken, a.scope, Some(a.expiresIn.toLong), a.createdAt) +// } + } + + def findAuthInfoByAccessToken(accessToken: scalaoauth2.provider.AccessToken): Future[Option[AuthInfo[User]]] = { + async { + Some(_auth_info) + } +// AccessTokens.findAccessToken(accessToken.token) map { a => +// val user = Users.getById(a.userId).get +// AuthInfo(user, a.clientId, a.scope, Some("")) +// } + } + + def findAuthInfoByRefreshToken(refreshToken: String): Future[Option[AuthInfo[User]]] = { + async { + Some(_auth_info) + } + // AccessTokens.findRefreshToken(refreshToken) map { a => +// val user = Users.getById(a.userId).get +// AuthInfo(user, a.clientId, a.scope, Some("")) +// } + } + + def findAuthInfoByCode(code: String): Future[Option[AuthInfo[User]]] = { + async { Some(_auth_info) } + // AuthCodes.find(code) map { a => +// val user = Users.getById(a.userId).get +// AuthInfo(user, a.clientId, a.scope, a.redirectUri) +// } + } + } \ No newline at end of file