diff --git a/.travis.yml b/.travis.yml index 7d65391..923a2d1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: java jdk: - - oraclejdk8 + - openjdk14 script: ./gradlew shadowJar before_cache: - rm -f $HOME/.gradle/caches/modules-2/modules-2.lock diff --git a/build.gradle.kts b/build.gradle.kts index 60f9f9b..eec33cd 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,19 +1,17 @@ -import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar - -val vertxVersion = "3.6.2" +val vertxVersion = "4.0.0-milestone4" val mainVerticle = "io.vertx.examples.feeds.verticles.MainVerticle" buildscript { // Gradle plugins dependencies { - classpath("com.github.jengelman.gradle.plugins:shadow:4.0.2") // FatJar packaging + classpath("com.github.jengelman.gradle.plugins:shadow:5.2.0") // FatJar packaging } } plugins { java application - id("com.github.johnrengelman.shadow") version "4.0.2" + id("com.github.johnrengelman.shadow") version "5.2.0" } repositories { @@ -24,58 +22,62 @@ group = "com.github.aesteve" version = "" tasks.withType { - sourceCompatibility = "1.8" + sourceCompatibility = JavaVersion.VERSION_14.toString() + targetCompatibility = JavaVersion.VERSION_14.toString() } dependencies { // Vert.x standard - compile("io.vertx:vertx-core:$vertxVersion") - compile("io.vertx:vertx-web:$vertxVersion") + implementation("io.vertx:vertx-core:$vertxVersion") + implementation("io.vertx:vertx-web:$vertxVersion") + implementation("io.vertx:vertx-web-client:$vertxVersion") // MongoDB - compile("io.vertx:vertx-mongo-client:$vertxVersion") - compile("de.flapdoodle.embed:de.flapdoodle.embed.mongo:2.2.0") + implementation("io.vertx:vertx-mongo-client:$vertxVersion") + implementation("de.flapdoodle.embed:de.flapdoodle.embed.mongo:2.2.0") // Redis - compile("io.vertx:vertx-redis-client:$vertxVersion") - compile("it.ozimov:embedded-redis:0.7.2") + implementation("io.vertx:vertx-redis-client:$vertxVersion") + implementation("it.ozimov:embedded-redis:0.7.2") // Handlebars - compile("io.vertx:vertx-web-templ-handlebars:$vertxVersion") - compile("com.github.jknack:handlebars:2.1.0") + implementation("io.vertx:vertx-web-templ-handlebars:$vertxVersion") + implementation("com.github.jknack:handlebars:2.1.0") // RSS - compile("com.rometools:rome:1.5.0") -} + implementation("com.rometools:rome:1.12.2") -application { - mainClassName = "io.vertx.core.Launcher" + runtimeOnly("ch.qos.logback:logback-classic:1.2.3") } - -tasks.shadowJar { - classifier = "" - manifest { - attributes["Main-Verticle"] = "io.vertx.examples.feeds.verticles.MainVerticle" - } - mergeServiceFiles { - include("META-INF/services/io.vertx.core.spi.VerticleFactory") - } - into("webroot") { - from("webroot") - } - into("templates") { - from("templates") - } +application { + mainClassName = "io.vertx.core.Launcher" } -tasks.withType { - args = listOf("run", mainVerticle) -} -tasks.withType { - gradleVersion = "5.1.1" +tasks { + shadowJar { + archiveClassifier.set("") + manifest { + attributes["Main-Verticle"] = "io.vertx.examples.feeds.verticles.MainVerticle" + } + mergeServiceFiles { + include("META-INF/services/io.vertx.core.spi.VerticleFactory") + } + into("webroot") { + from("webroot") + } + into("templates") { + from("templates") + } + } + withType { + args = listOf("run", mainVerticle) + } + withType { + gradleVersion = "6.4" + } } diff --git a/gradle.properties b/gradle.properties deleted file mode 100644 index f6c4129..0000000 --- a/gradle.properties +++ /dev/null @@ -1 +0,0 @@ -vertxVersion=3.5.0 diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 87b738c..62d4c05 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 558870d..4c5803d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.4-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index af6708f..fbd7c51 100755 --- a/gradlew +++ b/gradlew @@ -1,5 +1,21 @@ #!/usr/bin/env sh +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + ############################################################################## ## ## Gradle start up script for UN*X @@ -28,7 +44,7 @@ APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m"' +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" @@ -66,6 +82,7 @@ esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then @@ -109,10 +126,11 @@ if $darwin; then GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath @@ -138,19 +156,19 @@ if $cygwin ; then else eval `echo args$i`="\"$arg\"" fi - i=$((i+1)) + i=`expr $i + 1` done case $i in - (0) set -- ;; - (1) set -- "$args0" ;; - (2) set -- "$args0" "$args1" ;; - (3) set -- "$args0" "$args1" "$args2" ;; - (4) set -- "$args0" "$args1" "$args2" "$args3" ;; - (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; esac fi @@ -159,14 +177,9 @@ save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } -APP_ARGS=$(save "$@") +APP_ARGS=`save "$@"` # Collect all arguments for the java command, following the shell quoting and substitution rules eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" -# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong -if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then - cd "$(dirname "$0")" -fi - exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 6d57edc..5093609 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,3 +1,19 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + @if "%DEBUG%" == "" @echo off @rem ########################################################################## @rem @@ -13,8 +29,11 @@ if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome @@ -65,6 +84,7 @@ set CMD_LINE_ARGS=%* set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + @rem Execute Gradle "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% diff --git a/src/main/java/io/vertx/examples/feeds/dao/MongoDAO.java b/src/main/java/io/vertx/examples/feeds/dao/MongoDAO.java index 20c96c5..7940b1e 100644 --- a/src/main/java/io/vertx/examples/feeds/dao/MongoDAO.java +++ b/src/main/java/io/vertx/examples/feeds/dao/MongoDAO.java @@ -1,16 +1,15 @@ package io.vertx.examples.feeds.dao; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; +import io.vertx.core.Future; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.mongo.MongoClient; -import io.vertx.ext.mongo.MongoClientUpdateResult; -import java.util.List; +import java.util.Optional; public class MongoDAO { + public static final String ID_COLUMN = "_id"; public static final String TABLE_USERS = "users"; public static final String TABLE_FEEDS = "feeds"; public static final String COLUMN_SUBSCRIPTIONS = "subscriptions"; @@ -21,123 +20,98 @@ public MongoDAO(MongoClient mongo) { this.mongo = mongo; } - public void userById(String userId, Handler> handler) { - JsonObject query = new JsonObject(); - query.put("_id", userId); - mongo.findOne(TABLE_USERS, query, null, handler); + public Future> userById(String userId) { + return mongo + .findOne(TABLE_USERS, new JsonObject().put(ID_COLUMN, userId), null) + .map(Optional::ofNullable); } - public void userByLoginAndPwd(String login, String hashedPwd, Handler> handler) { - JsonObject query = new JsonObject(); - query.put("login", login); - query.put("password", hashedPwd); - mongo.findOne(TABLE_USERS, query, null, handler); + public Future> userByLoginAndPwd(String login, String hashedPwd) { + var query = new JsonObject() + .put("login", login) + .put("password", hashedPwd); + return mongo + .findOne(TABLE_USERS, query, null) + .map(Optional::ofNullable); } - public void newUser(String login, String hashedPwd, Handler> handler) { - final JsonObject user = new JsonObject(); - user.put("login", login); - user.put("password", hashedPwd); - user.put(TABLE_FEEDS, new JsonArray()); - mongo.insert(TABLE_USERS, user, handler); + public Future newUser(String login, String hashedPwd) { + var user = new JsonObject() + .put("login", login) + .put("password", hashedPwd) + .put(TABLE_FEEDS, new JsonArray()); + return mongo.insert(TABLE_USERS, user); } - public void getFeed(String feedHash, Handler> handler) { - JsonObject query = new JsonObject(); - query.put("hash", feedHash); - mongo.findOne(TABLE_FEEDS, query, null, handler); - } - - public void updateFeed(String feedHash, JsonObject newValue, Handler> handler) { - JsonObject query = new JsonObject(); - query.put("hash", feedHash); - JsonObject updateQuery = new JsonObject(); - updateQuery.put("$set", newValue); - mongo.updateCollection(TABLE_FEEDS, query, updateQuery, handler); + public Future> getFeedByHash(String feedHash) { + return mongo + .findOne(TABLE_FEEDS, new JsonObject().put("hash", feedHash), null) + .map(Optional::ofNullable); } @SuppressWarnings("unchecked") - public void unsubscribe(JsonObject user, JsonObject subscription, Handler> handler) { - List subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS).getList(); - subscriptions.removeIf(sub -> { - return sub.getString("hash").equals(subscription.getString("hash")); - }); - JsonObject newSubscriptions = new JsonObject(); - newSubscriptions.put("$set", new JsonObject().put(COLUMN_SUBSCRIPTIONS, new JsonArray(subscriptions))); - JsonObject userQuery = new JsonObject(); - userQuery.put("_id", user.getString("_id")); - mongo.findOneAndUpdate(TABLE_USERS, userQuery, newSubscriptions, updateHandler -> { - if (updateHandler.failed()) { - handler.handle(updateHandler); - return; - } - JsonObject feedQuery = new JsonObject(); - feedQuery.put("_id", subscription.getString("_id")); - mongo.findOne(TABLE_FEEDS, feedQuery, null, duplicateHandler -> { - if (duplicateHandler.failed()) { - handler.handle(duplicateHandler); - return; - } - JsonObject feed = duplicateHandler.result(); - JsonObject updateQuery = new JsonObject(); - Integer oldCount = feed.getInteger(COLUMN_SUBSCRIBER_COUNT, 1); - subscription.put(COLUMN_SUBSCRIBER_COUNT, oldCount - 1); - updateQuery.put("_id", feed.getString("_id")); - JsonObject updateValue = new JsonObject(); - updateValue.put("$set", subscription); - mongo.update(TABLE_FEEDS, updateQuery, updateValue, handler::handle); - }); - - }); + public Future unsubscribe(JsonObject user, JsonObject subscription) { + var subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS); + subscriptions.getList().removeIf(sub -> + ((JsonObject)sub).getString("hash").equals(subscription.getString("hash")) + ); + var newSubscriptions = new JsonObject().put("$set", new JsonObject().put(COLUMN_SUBSCRIPTIONS, subscriptions)); + return mongo.findOneAndUpdate(TABLE_USERS, byId(user.getString(ID_COLUMN)), newSubscriptions) + .flatMap(res -> mongo.findOne(TABLE_FEEDS, byId(subscription.getString(ID_COLUMN)), null)) + .flatMap(feed -> { + var oldCount = feed.getInteger(COLUMN_SUBSCRIBER_COUNT, 1); + subscription.put(COLUMN_SUBSCRIBER_COUNT, oldCount - 1); + var updateValue = new JsonObject().put("$set", subscription); + return mongo.updateCollection(TABLE_FEEDS, byId(feed.getString(ID_COLUMN)), updateValue); + }) + .mapEmpty(); } - public void newSubscription(JsonObject user, JsonObject subscription, Handler> handler) { - String urlHash = subscription.getString("hash"); - JsonObject findQuery = new JsonObject(); - findQuery.put("hash", urlHash); - mongo.findOne(TABLE_FEEDS, findQuery, null, findResult -> { - if (findResult.failed()) { - handler.handle(findResult); - return; - } - JsonObject existingFeed = findResult.result(); - if (existingFeed == null) { - subscription.put(COLUMN_SUBSCRIBER_COUNT, 1); - mongo.insert(TABLE_FEEDS, subscription, insertResult -> { - if (insertResult.failed()) { - handler.handle(insertResult); - return; - } - subscription.put("_id", insertResult.result()); - attachSubscriptionToUser(user, subscription, handler); - }); - } else { - JsonObject updateQuery = new JsonObject(); - Integer oldCount = existingFeed.getInteger(COLUMN_SUBSCRIBER_COUNT, 0); - subscription.put(COLUMN_SUBSCRIBER_COUNT, oldCount + 1); - updateQuery.put("_id", existingFeed.getString("_id")); - subscription.put("_id", existingFeed.getString("_id")); - JsonObject updateValue = new JsonObject(); - updateValue.put("$set", subscription); - mongo.findOneAndUpdate(TABLE_FEEDS, updateQuery, updateValue, updateHandler -> { - if (updateHandler.failed()) { - handler.handle(updateHandler); - return; - } - attachSubscriptionToUser(user, subscription, handler); - }); - } - }); + public Future newSubscription(JsonObject user, JsonObject subscription) { + var urlHash = subscription.getString("hash"); + return getFeedByHash(urlHash) + .flatMap(maybeFeed -> { + if (maybeFeed.isEmpty()) { + return createSubscription(subscription, user); + } else { + return updateSubscription(subscription, user, maybeFeed.get()); + } + }) + .mapEmpty(); } - private void attachSubscriptionToUser(JsonObject user, JsonObject subscription, Handler> handler) { - JsonArray subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS, new JsonArray()); - subscriptions.add(subscription); - JsonObject query = new JsonObject(); - query.put("_id", user.getString("_id")); - JsonObject newSubscriptions = new JsonObject(); - newSubscriptions.put("$set", new JsonObject().put(COLUMN_SUBSCRIPTIONS, subscriptions)); - mongo.findOneAndUpdate(TABLE_USERS, query, newSubscriptions, handler::handle); + private Future createSubscription(JsonObject subscription, JsonObject user) { + subscription.put(COLUMN_SUBSCRIBER_COUNT, 1); + return mongo + .insert(TABLE_FEEDS, subscription) + .flatMap(newSubscription -> { + subscription.put(ID_COLUMN, newSubscription); + return attachSubscriptionToUser(user, subscription); + }); + } + + private JsonObject byId(String id) { + return new JsonObject().put(ID_COLUMN, id); + } + + private Future updateSubscription(JsonObject subscription, JsonObject user, JsonObject feed) { + var feedId = feed.getString(ID_COLUMN); + var oldCount = feed.getInteger(COLUMN_SUBSCRIBER_COUNT, 0); + subscription + .put(COLUMN_SUBSCRIBER_COUNT, oldCount + 1) + .put(ID_COLUMN, feedId); + var updateValue = new JsonObject().put("$set", subscription); + return mongo + .findOneAndUpdate(TABLE_FEEDS, byId(feedId), updateValue) + .flatMap(updateHandler -> attachSubscriptionToUser(user, subscription)); + } + + private Future attachSubscriptionToUser(JsonObject user, JsonObject subscription) { + var subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS, new JsonArray()).add(subscription); + var query = new JsonObject().put(ID_COLUMN, user.getString(ID_COLUMN)); + var newSubscriptions = new JsonObject() + .put("$set", new JsonObject().put(COLUMN_SUBSCRIPTIONS, subscriptions)); + return mongo.findOneAndUpdate(TABLE_USERS, query, newSubscriptions); } } diff --git a/src/main/java/io/vertx/examples/feeds/dao/RedisDAO.java b/src/main/java/io/vertx/examples/feeds/dao/RedisDAO.java index d99e2f5..e1d8f88 100644 --- a/src/main/java/io/vertx/examples/feeds/dao/RedisDAO.java +++ b/src/main/java/io/vertx/examples/feeds/dao/RedisDAO.java @@ -1,79 +1,77 @@ package io.vertx.examples.feeds.dao; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; +import io.vertx.core.Future; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.core.logging.Logger; -import io.vertx.core.logging.LoggerFactory; -import io.vertx.examples.feeds.utils.rss.FeedUtils; -import io.vertx.redis.RedisClient; -import io.vertx.redis.op.RangeLimitOptions; +import io.vertx.examples.feeds.utils.VertxUtils; +import io.vertx.examples.feeds.utils.rss.FeedConverters; +import io.vertx.redis.client.RedisAPI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class RedisDAO { private static final Logger LOG = LoggerFactory.getLogger(RedisDAO.class); - private final RedisClient redis; + private final RedisAPI redis; - public RedisDAO(RedisClient redis) { + public RedisDAO(RedisAPI redis) { this.redis = redis; } - public void getEntries(String feedHash, Date from, Date to, Handler> handler) { - String fromStr; - String toStr; + public Future allEntriesForFeed(String feedHash, Date from, Date to) { + var args = new ArrayList(); + args.add(feedHash); if (from != null) { - fromStr = Double.toString((double)from.getTime()); + args.add(Double.toString((double)from.getTime())); } else { - fromStr = "-inf"; + args.add("-inf"); } if (to != null) { - toStr = Double.toString((double)to.getTime()); + args.add(Double.toString((double)to.getTime())); } else { - toStr = "+inf"; + args.add("+inf"); } - redis.zrevrangebyscore(feedHash, toStr, fromStr, RangeLimitOptions.NONE, handler); + return redis + .zrevrangebyscore(args) + .map(res -> res.stream() + .map(entry -> new JsonObject(entry.toString())) + .collect(VertxUtils.JSON_ARRAY_COLLECTOR) + ); } - public void getMaxDate(String feedHash, Handler handler) { - /* - * FIXME : this fails with a ClassCastException use it as soon as RedisClient is fixed - * RangeLimitOptions options = new RangeLimitOptions(); - * options.setLimit(0, 1); - */ - redis.zrevrangebyscore(feedHash, "+inf", "-inf", RangeLimitOptions.NONE, result -> { - if (result.failed()) { - LOG.error("Fetch max date failed : ", result.cause()); - handler.handle(null); - } else { - JsonArray array = result.result(); - if (array.isEmpty()) { - LOG.info("Fetch max date is null, array is empty for feedHash : " + feedHash); - handler.handle(null); - return; - } - JsonObject max = new JsonObject(array.getString(0)); - String published = max.getString("published"); - try { - handler.handle(FeedUtils.getDate(published)); - } catch (ParseException pe) { - LOG.error("Could not fetch max date : ", pe); - handler.handle(null); - } - } - }); + public Future getMaxDate(String feedHash) { + return redis.zrevrangebyscore(Arrays.asList(feedHash, "+inf", "-inf")) + .map(resp -> { + if (resp.size() == 0) { + LOG.info("Fetch max date is null, array is empty for feedHash : " + feedHash); + return null; + } + var max = new JsonObject(resp.get(0).toString()); + var published = max.getString("published"); + try { + return FeedConverters.getDate(published); + } catch (ParseException pe) { + LOG.error("Could not fetch max date : ", pe); + return null; + } + }); } - public void insertEntries(String feedHash, List entries, Handler> handler) { - Map members = new HashMap<>(entries.size()); - entries.forEach(entry -> members.put(entry.toString(), entry.getDouble("score"))); - redis.zaddMany(feedHash, members, handler); + public Future insertEntries(String feedHash, List entries) { + var args = new ArrayList(); + args.add(feedHash); + entries.forEach(entry -> { + args.add(Double.toString(entry.getDouble("score"))); + args.add(entry.toString()); + }); + return redis.zadd(args).mapEmpty(); } + } diff --git a/src/main/java/io/vertx/examples/feeds/handlers/UserContextHandler.java b/src/main/java/io/vertx/examples/feeds/handlers/UserContextHandler.java index c92a15c..7b2df8a 100644 --- a/src/main/java/io/vertx/examples/feeds/handlers/UserContextHandler.java +++ b/src/main/java/io/vertx/examples/feeds/handlers/UserContextHandler.java @@ -1,9 +1,9 @@ package io.vertx.examples.feeds.handlers; -import io.vertx.core.json.JsonObject; import io.vertx.examples.feeds.dao.MongoDAO; import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.Session; + +import static io.vertx.examples.feeds.utils.VertxUtils.failOr; public class UserContextHandler { @@ -13,44 +13,38 @@ public UserContextHandler(MongoDAO mongo) { this.mongo = mongo; } - public void fromApiToken(RoutingContext context) { - String userId; - String token = context.request().params().get("accessToken"); + public void fromApiToken(RoutingContext rc) { + var token = rc.request().params().get("accessToken"); if (token == null) { - context.fail(401); + rc.fail(401); return; } - userId = (String) context.vertx().sharedData().getLocalMap("access_tokens").get(token); + var userId = rc.vertx().sharedData().getLocalMap("access_tokens").get(token); if (userId == null) { - context.fail(401); + rc.fail(401); } else { - findAndInjectUser(context, userId); + findAndInjectUser(rc, userId); } } - public void fromSession(RoutingContext context) { - Session session = context.session(); - String userId = session.get("userId"); + public void fromSession(RoutingContext rc) { + var userId = rc.session().get("userId"); if (userId == null) { - context.fail(401); + rc.fail(401); } else { - findAndInjectUser(context, userId); + findAndInjectUser(rc, userId); } } - private void findAndInjectUser(RoutingContext context, String userId) { - mongo.userById(userId, mongoHandler -> { - if (mongoHandler.failed()) { - context.fail(mongoHandler.cause()); - } else { - JsonObject user = mongoHandler.result(); - if (user == null) { - context.fail(403); - } else { - context.put("user", user); - context.next(); - } - } - }); + private void findAndInjectUser(RoutingContext rc, String userId) { + mongo.userById(userId) + .setHandler(failOr(rc, maybeUser -> { + if (maybeUser.isEmpty()) { + rc.fail(403); + return; + } + rc.put("user", maybeUser.get()); + rc.next(); + })); } } diff --git a/src/main/java/io/vertx/examples/feeds/handlers/api/AuthenticationApi.java b/src/main/java/io/vertx/examples/feeds/handlers/api/AuthenticationApi.java index dcdebaf..4866587 100644 --- a/src/main/java/io/vertx/examples/feeds/handlers/api/AuthenticationApi.java +++ b/src/main/java/io/vertx/examples/feeds/handlers/api/AuthenticationApi.java @@ -1,11 +1,12 @@ package io.vertx.examples.feeds.handlers.api; -import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.json.JsonObject; +import io.vertx.core.http.HttpHeaders; import io.vertx.examples.feeds.dao.MongoDAO; import io.vertx.examples.feeds.utils.StringUtils; import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.Session; + +import static io.vertx.examples.feeds.dao.MongoDAO.ID_COLUMN; +import static io.vertx.examples.feeds.utils.VertxUtils.failOr; public class AuthenticationApi { @@ -15,67 +16,60 @@ public class AuthenticationApi { private static final String LOGIN = "login"; private static final String PWD = "password"; private static final String ACCESS_TOKEN = "access_token"; - private static final String INDEX = "/index.hbs"; + private static final String INDEX_PAGE = "/index.hbs"; + private static final String LOGIN_PAGE = "/login.hbs"; private static final String USER_ID = "userId"; - public AuthenticationApi(MongoDAO mongo) { this.mongo = mongo; this.strUtils = new StringUtils(); } - public void register(RoutingContext context) { - final String login = context.request().getParam(LOGIN); - final String pwd = context.request().getParam(PWD); - mongo.newUser(login, strUtils.hash256(pwd), result -> { - if (result.failed()) { - context.fail(result.cause()); - } else { - redirectTo(context, "/login.hbs"); - } - }); + public void register(RoutingContext rc) { + var login = rc.request().getParam(LOGIN_PAGE); + var pwd = rc.request().getParam(PWD); + mongo.newUser(login, strUtils.hash256(pwd)) + .setHandler(failOr(rc, result -> redirectTo(rc, LOGIN_PAGE))); } public void login(RoutingContext context) { - final String login = context.request().getParam(LOGIN); - final String pwd = context.request().getParam(PWD); - mongo.userByLoginAndPwd(login, strUtils.hash256(pwd), result -> { - if (result.failed()) { - context.fail(result.cause()); - return; - } - JsonObject user = result.result(); - if (user == null) { - redirectTo(context, "/login.hbs"); - return; - } - Session session = context.session(); - String accessToken = strUtils.generateToken(); - session.put(ACCESS_TOKEN, accessToken); - session.put(LOGIN, login); - session.put(USER_ID, user.getString("_id")); - context.vertx().sharedData().getLocalMap("access_tokens").put(accessToken, user.getString("_id")); - redirectTo(context, INDEX); - }); + var login = context.request().getParam(LOGIN); + var pwd = context.request().getParam(PWD); + mongo.userByLoginAndPwd(login, strUtils.hash256(pwd)) + .setHandler(failOr(context, maybeUser -> { + if (maybeUser.isEmpty()) { + redirectTo(context, LOGIN_PAGE); + return; + } + var user = maybeUser.get(); + var accessToken = strUtils.generateToken(); + var userId = user.getString(ID_COLUMN); + context.session() + .put(ACCESS_TOKEN, accessToken) + .put(LOGIN_PAGE, login) + .put(USER_ID, userId); + context.vertx().sharedData().getLocalMap("access_tokens").put(accessToken, userId); + redirectTo(context, INDEX_PAGE); + })); } - public void logout(RoutingContext context) { - final Session session = context.session(); - session.remove(LOGIN); + public void logout(RoutingContext rc) { + var session = rc.session(); + session.remove(LOGIN_PAGE); session.remove(USER_ID); - String accessToken = session.get(ACCESS_TOKEN); + var accessToken = session.get(ACCESS_TOKEN); if (accessToken != null) { - context.vertx().sharedData().getLocalMap("access_tokens").remove(accessToken); + rc.vertx().sharedData().getLocalMap("access_tokens").remove(accessToken); } session.remove(ACCESS_TOKEN); - redirectTo(context, "/index.hbs"); + redirectTo(rc, INDEX_PAGE); } - private static void redirectTo(RoutingContext context, String url) { - HttpServerResponse response = context.response(); - response.setStatusCode(303); - response.headers().add("Location", url); - response.end(); + private static void redirectTo(RoutingContext rc, String url) { + rc.response() + .setStatusCode(303) + .putHeader(HttpHeaders.LOCATION, url) + .end(); } } diff --git a/src/main/java/io/vertx/examples/feeds/handlers/api/FeedsApi.java b/src/main/java/io/vertx/examples/feeds/handlers/api/FeedsApi.java index 3d0232f..f7df9fa 100644 --- a/src/main/java/io/vertx/examples/feeds/handlers/api/FeedsApi.java +++ b/src/main/java/io/vertx/examples/feeds/handlers/api/FeedsApi.java @@ -1,28 +1,21 @@ package io.vertx.examples.feeds.handlers.api; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.core.logging.Logger; -import io.vertx.core.logging.LoggerFactory; import io.vertx.examples.feeds.dao.MongoDAO; import io.vertx.examples.feeds.dao.RedisDAO; import io.vertx.examples.feeds.utils.StringUtils; import io.vertx.ext.web.RoutingContext; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; +import static io.vertx.examples.feeds.utils.VertxUtils.failOr; /** * Simple CRUD for handling feeds */ public class FeedsApi { - private static final Logger LOG = LoggerFactory.getLogger(FeedsApi.class); - public static final String COLUMN_SUBSCRIPTIONS = "subscriptions"; - public static final String COLUMN_FEED_ID = "feedId"; + public static final String SUBSCRIPTIONS_COLUMN = "subscriptions"; + public static final String FEED_ID_COLUMN = "feedId"; private final MongoDAO mongo; private final RedisDAO redis; @@ -34,118 +27,101 @@ public FeedsApi(MongoDAO mongo, RedisDAO redis) { this.strUtils = new StringUtils(); } - public void create(RoutingContext context) { - JsonObject body = context.getBodyAsJson(); - JsonObject user = context.get("user"); - JsonArray subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS); - final String urlHash = strUtils.hash256(body.getString("url")); + public void create(RoutingContext rc) { + var body = rc.getBodyAsJson(); + var user = extractUser(rc); + var subscriptions = user.getJsonArray(SUBSCRIPTIONS_COLUMN); + var urlHash = strUtils.hash256(body.getString("url")); body.put("hash", urlHash); if (subscriptions != null) { - boolean alreadySubscribed = subscriptions.stream().anyMatch(subscription -> - ((JsonObject) subscription).getString("hash").equals(urlHash) - ); + boolean alreadySubscribed = subscriptions + .stream() + .anyMatch(subscription -> + ((JsonObject) subscription).getString("hash").equals(urlHash) + ); if (alreadySubscribed) { - context.fail(400); + rc.fail(409); return; } } - mongo.newSubscription(user, body, result -> { - if (result.failed()) { - context.fail(result.cause()); - return; - } - context.response().end(body.toString()); - }); + mongo.newSubscription(user, body) + .setHandler(failOr(rc, result -> + rc.response().end(body.toString()) + )); } - public void retrieve(RoutingContext context) { - HttpServerRequest request = context.request(); - String feedId = request.getParam(COLUMN_FEED_ID); + public void retrieve(RoutingContext rc) { + var request = rc.request(); + var feedId = request.getParam(FEED_ID_COLUMN); if (feedId == null) { - context.fail(400); + rc.fail(400); return; } - JsonObject user = context.get("user"); - JsonObject subscription = getSubscription(user, context); + var user = extractUser(rc); + var subscription = getSubscription(user, rc); if (subscription != null) { - context.response().end(subscription.toString()); + rc.response().end(subscription.toString()); } } - public void update(RoutingContext context) { - context.response().end("TODO"); - } - - public void delete(RoutingContext context) { - JsonObject user = context.get("user"); - JsonObject subscription = getSubscription(user, context); + public void delete(RoutingContext rc) { + var user = extractUser(rc); + var subscription = getSubscription(user, rc); if (subscription == null) { return; } - String feedId = context.request().getParam(COLUMN_FEED_ID); + var feedId = rc.request().getParam(FEED_ID_COLUMN); subscription.put("hash", feedId); - mongo.unsubscribe(user, subscription, result -> { - if (result.failed()) { - context.fail(result.cause()); - return; - } - context.response().end(subscription.toString()); - }); + mongo.unsubscribe(user, subscription) + .setHandler(failOr(rc, r -> rc.response().end(subscription.toString()))); } - public void list(RoutingContext context) { - JsonObject user = context.get("user"); - JsonArray subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS); + public void list(RoutingContext rc) { + var user = extractUser(rc); + var subscriptions = user.getJsonArray(SUBSCRIPTIONS_COLUMN); if (subscriptions == null) { subscriptions = new JsonArray(); } - HttpServerResponse response = context.response(); - response.end(subscriptions.toString()); + rc.response().end(subscriptions.toString()); } - public void entries(RoutingContext context) { - JsonObject user = context.get("user"); - JsonObject feed = getSubscription(user, context); + public void entries(RoutingContext rc) { + var user = extractUser(rc); + var feed = getSubscription(user, rc); if (feed != null) { - String feedId = context.request().getParam(COLUMN_FEED_ID); - redis.getEntries(feedId, null, null, handler -> { - if (handler.failed()) { - context.fail(handler.cause()); - } else { - JsonArray orig = handler.result(); - List list = new ArrayList<>(orig.size()); - orig.forEach(val -> { - LOG.info("found val : " + val); - list.add(new JsonObject(val.toString())); - }); - context.response().end(list.toString()); - } - }); + var feedId = rc.request().getParam(FEED_ID_COLUMN); + redis.allEntriesForFeed(feedId, null, null) + .setHandler(failOr(rc, entries -> rc.response().end(entries.encode()))); } } - private static JsonObject getSubscription(JsonObject user, RoutingContext context) { - String feedId = context.request().getParam(COLUMN_FEED_ID); + private static JsonObject getSubscription(JsonObject user, RoutingContext rc) { + var feedId = rc.request().getParam(FEED_ID_COLUMN); if (feedId == null) { - context.fail(400); + rc.fail(400); return null; } - JsonArray subscriptions = user.getJsonArray(COLUMN_SUBSCRIPTIONS); + var subscriptions = user.getJsonArray(SUBSCRIPTIONS_COLUMN); if (subscriptions == null) { - context.fail(404); + rc.fail(404); return null; } - Optional optional = subscriptions.stream().filter(sub -> { - JsonObject subscription = (JsonObject) sub; - return subscription.getString("hash").equals(feedId); - }).findFirst(); + var optional = subscriptions + .stream() + .filter(sub -> ((JsonObject)sub).getString("hash").equals(feedId)) + .findFirst(); if (optional.isPresent()) { // OK, it's one of user's subscription return (JsonObject) optional.get(); } else { // either the feed doesn't exist, or the user tries to access // someone else's feed -> no distinction - context.fail(403); + rc.fail(403); return null; } } + + private JsonObject extractUser(RoutingContext rc) { + return rc.get("user"); + } + } diff --git a/src/main/java/io/vertx/examples/feeds/utils/RedisUtils.java b/src/main/java/io/vertx/examples/feeds/utils/RedisUtils.java index 469291f..6244d51 100644 --- a/src/main/java/io/vertx/examples/feeds/utils/RedisUtils.java +++ b/src/main/java/io/vertx/examples/feeds/utils/RedisUtils.java @@ -1,15 +1,15 @@ package io.vertx.examples.feeds.utils; import io.vertx.core.json.JsonObject; -import io.vertx.redis.RedisOptions; +import io.vertx.redis.client.RedisOptions; + +import java.util.Collections; public class RedisUtils { public static RedisOptions createRedisOptions(JsonObject conf) { - RedisOptions options = new RedisOptions(); - options.setHost(conf.getString("host")); - options.setPort(conf.getInteger("port")); - return options; + var endpoints = Collections.singletonList("redis://" + conf.getString("host") + ":" + conf.getInteger("port")); + return new RedisOptions().setEndpoints(endpoints); } } diff --git a/src/main/java/io/vertx/examples/feeds/utils/StringUtils.java b/src/main/java/io/vertx/examples/feeds/utils/StringUtils.java index e0b9786..c21bd19 100644 --- a/src/main/java/io/vertx/examples/feeds/utils/StringUtils.java +++ b/src/main/java/io/vertx/examples/feeds/utils/StringUtils.java @@ -1,7 +1,7 @@ package io.vertx.examples.feeds.utils; -import java.io.UnsupportedEncodingException; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; @@ -22,12 +22,8 @@ public StringUtils() { public String hash256(String str) { sha256.reset(); - try { - sha256.update(str.getBytes("UTF-8")); - } catch (UnsupportedEncodingException uee) { - throw new RuntimeException("UTF-8 is not supported by this platform", uee); - } - byte[] digest = sha256.digest(); + sha256.update(str.getBytes(StandardCharsets.UTF_8)); + byte[] digest = sha256.digest(); return toHexString(digest); } diff --git a/src/main/java/io/vertx/examples/feeds/utils/VertxUtils.java b/src/main/java/io/vertx/examples/feeds/utils/VertxUtils.java new file mode 100644 index 0000000..4cceae2 --- /dev/null +++ b/src/main/java/io/vertx/examples/feeds/utils/VertxUtils.java @@ -0,0 +1,27 @@ +package io.vertx.examples.feeds.utils; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonArray; +import io.vertx.ext.web.RoutingContext; + +import java.util.stream.Collector; + +public class VertxUtils { + + private VertxUtils() {} + + public static Collector JSON_ARRAY_COLLECTOR = + Collector.of(JsonArray::new, JsonArray::add, JsonArray::addAll); + + public static Handler> failOr(RoutingContext rc, Handler handler) { + return res -> { + if (res.failed()) { + rc.fail(res.cause()); + return; + } + handler.handle(res.result()); + }; + } + +} diff --git a/src/main/java/io/vertx/examples/feeds/utils/rss/FeedConverters.java b/src/main/java/io/vertx/examples/feeds/utils/rss/FeedConverters.java new file mode 100644 index 0000000..9d06bee --- /dev/null +++ b/src/main/java/io/vertx/examples/feeds/utils/rss/FeedConverters.java @@ -0,0 +1,80 @@ +package io.vertx.examples.feeds.utils.rss; + +import com.rometools.rome.feed.synd.SyndEntry; +import com.rometools.rome.feed.synd.SyndFeed; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class FeedConverters { + + public static final SimpleDateFormat isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + private static final Logger LOG = LoggerFactory.getLogger(FeedConverters.class); + + public static JsonObject toJson(SyndFeed feed) { + var json = new JsonObject() + .put("title", feed.getTitle()) + .put("description", feed.getDescription()); + var image = feed.getImage(); + if (image != null) { + json.put("image", feed.getImage().getUrl()); + // or getLink() ? + } + return json.put("published", toJson(feed.getPublishedDate())); + + } + + public static List toJson(List entries, Date maxDate) { + return entries + .stream() + .flatMap(entry -> { + var published = entry.getPublishedDate(); + if (published == null) { + published = entry.getUpdatedDate(); + } + if (maxDate == null || (published != null && published.compareTo(maxDate) > 0)) { + LOG.info("maxDate = {}", maxDate); + return Stream.of(toJson(entry)); + } else { + return Stream.empty(); + } + }) + .collect(Collectors.toList()); + } + + static JsonObject toJson(SyndEntry entry) { + var json = new JsonObject().put("title", entry.getTitle()); + var published = entry.getPublishedDate(); + if (published == null) { + LOG.warn("!!!!!! The RSS has no published date : this will lead to duplicates entry"); + published = new Date(); // Can't do anything better here, but next time the RSS is fetched, new entries will be added + } + json.put("published", toJson(published)) + .put("score", (double)published.getTime()) + .put("link", entry.getLink()); + var description = entry.getDescription(); + if (description != null) { + json.put("description", description.getValue()); + } + return json; + } + + public static Date getDate(String isoDate) throws ParseException { + return isoDateFormat.parse(isoDate); + } + + public static String toJson(Date date) { + if (date == null) { + return null; + } + return isoDateFormat.format(date); + } +} diff --git a/src/main/java/io/vertx/examples/feeds/utils/rss/FeedUtils.java b/src/main/java/io/vertx/examples/feeds/utils/rss/FeedUtils.java deleted file mode 100644 index 00b98bf..0000000 --- a/src/main/java/io/vertx/examples/feeds/utils/rss/FeedUtils.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.vertx.examples.feeds.utils.rss; - -import com.rometools.rome.feed.synd.SyndContent; -import com.rometools.rome.feed.synd.SyndEntry; -import com.rometools.rome.feed.synd.SyndFeed; -import com.rometools.rome.feed.synd.SyndImage; -import io.vertx.core.json.JsonObject; -import io.vertx.core.logging.Logger; -import io.vertx.core.logging.LoggerFactory; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -public interface FeedUtils { - - Logger LOG = LoggerFactory.getLogger(FeedUtils.class); - SimpleDateFormat isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); - - static JsonObject toJson(SyndFeed feed) { - JsonObject json = new JsonObject(); - json.put("title", feed.getTitle()); - json.put("description", feed.getDescription()); - SyndImage image = feed.getImage(); - if (image != null) { - json.put("image", feed.getImage().getUrl()); - // or getLink() ? - } - json.put("published", toJson(feed.getPublishedDate())); - return json; - - } - - static List toJson(List entries, Date maxDate) { - List result = new ArrayList<>(entries.size()); - entries.forEach(entry -> { - Date published = entry.getPublishedDate(); - if (published == null) { - published = entry.getUpdatedDate(); - } - if (maxDate == null || (published != null && published.compareTo(maxDate) > 0)) { - LOG.info("maxDate = " + maxDate); - result.add(toJson(entry)); - } - }); - return result; - } - - static JsonObject toJson(SyndEntry entry) { - JsonObject json = new JsonObject(); - json.put("title", entry.getTitle()); - Date published = entry.getPublishedDate(); - if (published == null) { - // TODO : log warning ? use another date ? - LOG.warn("!!!!!! The RSS has no published date : this will lead to duplicates entry"); - published = new Date(); // FIXME : absolutely wrong... - } - json.put("published", toJson(published)); - json.put("score", (double)published.getTime()); - json.put("link", entry.getLink()); - SyndContent description = entry.getDescription(); - if (description != null) { - json.put("description", description.getValue()); - } - return json; - } - - static Date getDate(String isoDate) throws ParseException { - return isoDateFormat.parse(isoDate); - } - - static String toJson(Date date) { - if (date == null) { - return null; - } - return isoDateFormat.format(date); - } -} diff --git a/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedMongo.java b/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedMongo.java index e83c5f2..df066b4 100644 --- a/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedMongo.java +++ b/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedMongo.java @@ -1,30 +1,28 @@ package io.vertx.examples.feeds.verticles; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; - -import java.io.IOException; - import de.flapdoodle.embed.mongo.MongodExecutable; import de.flapdoodle.embed.mongo.MongodStarter; import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; import de.flapdoodle.embed.mongo.config.Net; import de.flapdoodle.embed.mongo.distribution.Version; import de.flapdoodle.embed.process.runtime.Network; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; + +import java.io.IOException; public class EmbeddedMongo extends AbstractVerticle { private MongodExecutable mongod; @Override - public void start(Future future) { - MongodStarter starter = MongodStarter.getDefaultInstance(); - + public void start(Promise future) { + var starter = MongodStarter.getDefaultInstance(); try { - int port = MainVerticle.MONGO_PORT; - MongodConfigBuilder builder = new MongodConfigBuilder(); - builder.version(Version.Main.PRODUCTION); - builder.net(new Net(port, Network.localhostIsIPv6())); + var port = MainVerticle.MONGO_PORT; + var builder = new MongodConfigBuilder() + .version(Version.Main.PRODUCTION) + .net(new Net(port, Network.localhostIsIPv6())); mongod = starter.prepare(builder.build()); mongod.start(); } catch (IOException ioe) { @@ -35,7 +33,7 @@ public void start(Future future) { } @Override - public void stop(Future future) { + public void stop(Promise future) { if (mongod != null) { mongod.stop(); mongod = null; diff --git a/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedRedis.java b/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedRedis.java index 03b12c8..634e1c8 100644 --- a/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedRedis.java +++ b/src/main/java/io/vertx/examples/feeds/verticles/EmbeddedRedis.java @@ -2,14 +2,16 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; +import io.vertx.core.Promise; import redis.embedded.RedisServer; public class EmbeddedRedis extends AbstractVerticle { + private RedisServer server; @Override - public void start(Future future) { + public void start(Promise future) { try { server = new RedisServer(MainVerticle.REDIS_PORT); server.start(); // seems to be blocking @@ -20,7 +22,7 @@ public void start(Future future) { } @Override - public void stop(Future future) { + public void stop(Promise future) { if (server != null) { server.stop(); server = null; diff --git a/src/main/java/io/vertx/examples/feeds/verticles/FeedBroker.java b/src/main/java/io/vertx/examples/feeds/verticles/FeedBroker.java index b43eac0..4223690 100644 --- a/src/main/java/io/vertx/examples/feeds/verticles/FeedBroker.java +++ b/src/main/java/io/vertx/examples/feeds/verticles/FeedBroker.java @@ -1,23 +1,29 @@ package io.vertx.examples.feeds.verticles; -import com.rometools.rome.feed.synd.SyndFeed; import com.rometools.rome.io.FeedException; import com.rometools.rome.io.SyndFeedInput; -import io.vertx.core.*; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpHeaders; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.core.logging.Logger; -import io.vertx.core.logging.LoggerFactory; import io.vertx.examples.feeds.dao.RedisDAO; import io.vertx.examples.feeds.utils.RedisUtils; -import io.vertx.examples.feeds.utils.rss.FeedUtils; +import io.vertx.examples.feeds.utils.rss.FeedConverters; import io.vertx.ext.mongo.MongoClient; -import io.vertx.redis.RedisClient; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.StringReader; import java.net.MalformedURLException; @@ -38,7 +44,7 @@ public class FeedBroker extends AbstractVerticle { private MongoClient mongo; private RedisDAO redis; private Long timerId; - private Map clients; + private Map clients; @Override public void init(Vertx vertx, Context context) { @@ -48,48 +54,61 @@ public void init(Vertx vertx, Context context) { } @Override - public void start(Future future) { + public void start(Promise promise) { mongo = MongoClient.createShared(vertx, config.getJsonObject("mongo")); - redis = new RedisDAO(RedisClient.create(vertx, RedisUtils.createRedisOptions(config.getJsonObject("redis")))); - fetchFeeds(); - future.complete(); + Redis.createClient(vertx, RedisUtils.createRedisOptions(config.getJsonObject("redis"))) + .connect() + .map(api -> { + this.redis = new RedisDAO(RedisAPI.api(api)); + fetchFeedsPeriodically(); + return api; + }) + .mapEmpty() + .setHandler(promise); } @Override - public void stop(Future future) { + public void stop() { if (timerId != null) { vertx.cancelTimer(timerId); } mongo.close(); - clients.forEach((url, client) -> client.close()); - future.complete(); + clients.values().forEach(WebClient::close); } - private void fetchFeeds() { - JsonObject crit = new JsonObject(); - JsonObject gt0 = new JsonObject(); - gt0.put("$gt", 0); - crit.put("subscriber_count", gt0); - mongo.find("feeds", crit, result -> { - if (result.failed()) { - LOG.error("Could not retrieve feed list from Mongo", result.cause()); - } else { - this.readFeeds(result.result()); - } + private void fetchFeedsPeriodically() { + var clientsWithAtLeast1Subscription = new JsonObject().put("subscriber_count", new JsonObject().put("$gt", 0)); + vertx.setPeriodic(POLL_PERIOD, timerId -> { + this.timerId = timerId; + mongo.find("feeds", clientsWithAtLeast1Subscription) + .setHandler(res -> { + if (res.failed()) { + LOG.error("Could not fetch feeds from Mongo", res.cause()); + return; + } + this.readFeeds(res.result()) + .setHandler(res2 -> { + if (res2.failed()) { + LOG.error("Could not read feeds", res.cause()); + return; + } + LOG.info("Successfully read feeds"); + }); + }); }); } - private void readFeeds(List feeds) { - CompositeFuture.all( - feeds.stream().map(this::readFeed) - .collect(Collectors.toList()) - ).setHandler(fetchResult -> timerId = vertx.setTimer(POLL_PERIOD, timerIdentifier -> fetchFeeds())); - } + private CompositeFuture readFeeds(List feeds) { + return CompositeFuture.all( + feeds.stream() + .map(this::readFeed) + .collect(Collectors.toList()) + ); + } private Future readFeed(JsonObject jsonFeed) { - Future future = Future.future(); - String feedUrl = jsonFeed.getString("url"); - String feedId = jsonFeed.getString("hash"); + var feedUrl = jsonFeed.getString("url"); + var feedId = jsonFeed.getString("hash"); URL url; try { url = new URL(feedUrl); @@ -97,58 +116,53 @@ private Future readFeed(JsonObject jsonFeed) { LOG.warn("Invalid url : " + feedUrl, mfe); return Future.failedFuture(mfe); } - - redis.getMaxDate(feedId, maxDate -> getXML(url, response -> { - int status = response.statusCode(); - if (status < 200 || status >= 300) { - if (future != null) { - future.fail(new RuntimeException("Could not read feed " + feedUrl + ". Response status code : " + status)); - } - return; - } - response.bodyHandler(buffer -> this.parseXmlFeed(buffer, maxDate, url, feedId, future)); - })); - return future; + return redis + .getMaxDate(feedId) + .flatMap(maxDate -> + getXML(url).flatMap(buffer -> + this.parseXmlFeed(buffer, maxDate, url, feedId) + ) + ) + .mapEmpty(); } - private void parseXmlFeed(Buffer buffer, Date maxDate, URL url, String feedId, Future future) { - String xmlFeed = buffer.toString("UTF-8"); - StringReader xmlReader = new StringReader(xmlFeed); - SyndFeedInput feedInput = new SyndFeedInput(); + private Future parseXmlFeed(Buffer buffer, Date maxDate, URL url, String feedId) { + var xmlFeed = buffer.toString("UTF-8"); + var xmlReader = new StringReader(xmlFeed); + var feedInput = new SyndFeedInput(); try { - SyndFeed feed = feedInput.build(xmlReader); - JsonObject feedJson = FeedUtils.toJson(feed); - LOG.info(feedJson); - List jsonEntries = FeedUtils.toJson(feed.getEntries(), maxDate); - LOG.info("Insert " + jsonEntries.size() + " entries into Redis"); + var feed = feedInput.build(xmlReader); + var feedJson = FeedConverters.toJson(feed); + LOG.info("Read feed {}", feedJson); + var jsonEntries = FeedConverters.toJson(feed.getEntries(), maxDate); + LOG.info("Insert {} entries into Redis", jsonEntries.size()); if (jsonEntries.isEmpty()) { - future.complete(); - return; + return Future.succeededFuture(); } vertx.eventBus().publish(feedId, new JsonArray(jsonEntries)); - redis.insertEntries(feedId, jsonEntries, handler -> { - if (handler.failed()) { - LOG.error("Insert failed", handler.cause()); - future.fail(handler.cause()); - } else { - future.complete(); - } - }); + return redis.insertEntries(feedId, jsonEntries).mapEmpty(); } catch (FeedException fe) { LOG.error("Exception while reading feed : " + url.toString(), fe); - future.fail(fe); + return Future.failedFuture(fe); } } - private void getXML(URL url, Handler responseHandler) { - client(url) - .get(url.getPath(), responseHandler) - .putHeader(HttpHeaders.ACCEPT, "application/xml") - .end(); + private Future getXML(URL url) { + return client(url) + .get(url.getPath()) + .putHeader(HttpHeaders.ACCEPT.toString(), "application/xml") + .send() + .flatMap(response -> { + var status = response.statusCode(); + if (status < 200 || status >= 300) { + return Future.failedFuture(new RuntimeException("Could not read feed " + url + ". Response status code : " + status)); + } + return Future.succeededFuture(response.bodyAsBuffer()); + }); } - private HttpClient client(URL url) { - HttpClient client = clients.get(url.getHost()); + private WebClient client(URL url) { + var client = clients.get(url.getHost()); if (client == null) { client = createClient(url); clients.put(url.getHost(), client); @@ -156,9 +170,9 @@ private HttpClient client(URL url) { return client; } - private HttpClient createClient(URL url) { - final HttpClientOptions options = new HttpClientOptions(); - options.setDefaultHost(url.getHost()); - return vertx.createHttpClient(options); + private WebClient createClient(URL url) { + return WebClient.create(vertx, + new WebClientOptions().setDefaultHost(url.getHost()) + ); } } diff --git a/src/main/java/io/vertx/examples/feeds/verticles/MainVerticle.java b/src/main/java/io/vertx/examples/feeds/verticles/MainVerticle.java index daf4807..3486822 100644 --- a/src/main/java/io/vertx/examples/feeds/verticles/MainVerticle.java +++ b/src/main/java/io/vertx/examples/feeds/verticles/MainVerticle.java @@ -12,10 +12,11 @@ */ public class MainVerticle extends AbstractVerticle { - public static final int REDIS_PORT = 7878; + public static final int REDIS_PORT = 7474; public static final int MONGO_PORT = 8889; private List deploymentIds; + private DeploymentOptions workerOptions = new DeploymentOptions().setWorker(true); @Override public void init(Vertx vertx, Context context) { @@ -24,101 +25,67 @@ public void init(Vertx vertx, Context context) { } @Override - public void start(Future future) { + public void start(Promise future) { CompositeFuture .all(deployEmbeddedMongo(), deployEmbeddedRedis()) - .setHandler(res -> { - if (res.failed()) { - future.fail(res.cause()); - return; - } - deployFeedBroker().setHandler(future.map(v -> "").completer()); - }); + .flatMap(res -> deployFeedBroker()) + .setHandler(future); } private Future deployEmbeddedRedis() { - Future f = Future.future(); - DeploymentOptions options = new DeploymentOptions(); - options.setWorker(true); - vertx.deployVerticle(EmbeddedRedis.class.getName(), options, f); - return f; + return vertx.deployVerticle(EmbeddedRedis.class.getName(), workerOptions); } private Future deployEmbeddedMongo() { - Future f = Future.future(); - DeploymentOptions options = new DeploymentOptions(); - options.setWorker(true); - vertx.deployVerticle(EmbeddedMongo.class.getName(), options, f); - return f; + return vertx.deployVerticle(EmbeddedMongo.class.getName(), workerOptions); } - private Future deployFeedBroker() { - Future future = Future.future(); - JsonObject dbConfig = new JsonObject(); - dbConfig.put("redis", redisConfig()); - dbConfig.put("mongo", mongoConfig()); - DeploymentOptions brokerOptions = new DeploymentOptions(); - brokerOptions.setConfig(dbConfig); - vertx.deployVerticle(FeedBroker.class.getName(), brokerOptions, brokerResult -> { - if (brokerResult.failed()) { - future.fail(brokerResult.cause()); - } else { - deploymentIds.add(brokerResult.result()); - DeploymentOptions webserverOptions = new DeploymentOptions(); - webserverOptions.setConfig(dbConfig); - vertx.deployVerticle(WebServer.class.getName(), webserverOptions, serverResult -> { - if (serverResult.failed()) { - future.fail(serverResult.cause()); - } else { - deploymentIds.add(serverResult.result()); - future.complete(serverResult.result()); - } - }); - } - }); - return future; + private Future deployFeedBroker() { + var dbConfig = new JsonObject() + .put("redis", REDIS_CONF) + .put("mongo", MONGO_CONF); + var options = new DeploymentOptions().setConfig(dbConfig); + return vertx + .deployVerticle(FeedBroker.class.getName(), options) + .flatMap(deploymentId -> { + deploymentIds.add(deploymentId); + return vertx + .deployVerticle(WebServer.class.getName(), options) + .map(serverDeploymentId -> { + deploymentIds.add(serverDeploymentId); + return serverDeploymentId; + }) + .mapEmpty(); + }); } @Override - public void stop(Future future) { - CompositeFuture.all( - deploymentIds - .stream() - .map(this::undeploy) - .collect(Collectors.toList()) - ).setHandler(future.map(c -> null).completer()); - + public void stop(Promise future) { + CompositeFuture + .all( + deploymentIds + .stream() + .map(vertx::undeploy) + .collect(Collectors.toList()) + ) + .mapEmpty() + .setHandler(future); } - private Future undeploy(String deploymentId) { - Future future = Future.future(); - vertx.undeploy(deploymentId, res -> { - if (res.succeeded()) { - future.complete(); - } else { - future.fail(res.cause()); - } - }); - return future; - } + private static final JsonObject MONGO_CONF = + new JsonObject() + .put("host", "localhost") + .put("port", MONGO_PORT) + .put("db_name", "vertx-feeds"); - private static JsonObject mongoConfig() { - JsonObject config = new JsonObject(); - config.put("host", "localhost"); - config.put("port", MONGO_PORT); - config.put("db_name", "vertx-feeds"); - return config; - } - - private static JsonObject redisConfig() { - JsonObject config = new JsonObject(); - config.put("host", "localhost"); - config.put("port", REDIS_PORT); - return config; - } + private static final JsonObject REDIS_CONF = + new JsonObject() + .put("host", "localhost") + .put("port", REDIS_PORT); public static void main(String... args) { - Vertx v = Vertx.vertx(); - v.deployVerticle(MainVerticle.class.getName()); + Vertx.vertx() + .deployVerticle(MainVerticle.class.getName()); } + } diff --git a/src/main/java/io/vertx/examples/feeds/verticles/WebServer.java b/src/main/java/io/vertx/examples/feeds/verticles/WebServer.java index 0268fcd..2876db9 100644 --- a/src/main/java/io/vertx/examples/feeds/verticles/WebServer.java +++ b/src/main/java/io/vertx/examples/feeds/verticles/WebServer.java @@ -2,7 +2,7 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Context; -import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; @@ -16,14 +16,18 @@ import io.vertx.ext.bridge.PermittedOptions; import io.vertx.ext.mongo.MongoClient; import io.vertx.ext.web.Router; -import io.vertx.ext.web.Session; -import io.vertx.ext.web.handler.*; +import io.vertx.ext.web.handler.BodyHandler; +import io.vertx.ext.web.handler.ErrorHandler; +import io.vertx.ext.web.handler.SessionHandler; +import io.vertx.ext.web.handler.StaticHandler; +import io.vertx.ext.web.handler.TemplateHandler; import io.vertx.ext.web.handler.sockjs.BridgeOptions; import io.vertx.ext.web.handler.sockjs.SockJSHandler; import io.vertx.ext.web.sstore.LocalSessionStore; import io.vertx.ext.web.sstore.SessionStore; import io.vertx.ext.web.templ.handlebars.HandlebarsTemplateEngine; -import io.vertx.redis.RedisClient; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; @@ -45,49 +49,44 @@ public void init(Vertx vertx, Context context) { } @Override - public void start(Future future) { + public void start(Promise future) { mongo = new MongoDAO(MongoClient.createShared(vertx, config.getJsonObject("mongo"))); - RedisDAO redis = new RedisDAO(RedisClient.create(vertx, RedisUtils.createRedisOptions(config.getJsonObject("redis")))); + var redisCli = Redis.createClient(vertx, RedisUtils.createRedisOptions(config.getJsonObject("redis"))); + var redis = new RedisDAO(RedisAPI.api(redisCli)); authApi = new AuthenticationApi(mongo); feedsApi = new FeedsApi(mongo, redis); server = vertx.createHttpServer(createOptions()); - server.requestHandler(createRouter()); - server.listen(result -> { - if (result.succeeded()) { - future.complete(); - } else { - future.fail(result.cause()); - } - }); + server.requestHandler(createRouter()) + .listen() + .mapEmpty() + .setHandler(future); } @Override - public void stop(Future future) { + public void stop(Promise future) { if (server == null) { future.complete(); return; } - server.close(future.completer()); + server.close(future); } private static HttpServerOptions createOptions() { - HttpServerOptions options = new HttpServerOptions(); - options.setHost("localhost"); - options.setPort(9000); - return options; + return new HttpServerOptions() + .setHost("localhost") + .setPort(9000); } private Router createRouter() { - Router router = Router.router(vertx); + var router = Router.router(vertx); router.route().failureHandler(ErrorHandler.create(true)); /* Static resources */ staticHandler(router); /* Session / cookies for users */ - router.route().handler(CookieHandler.create()); - SessionStore sessionStore = LocalSessionStore.create(vertx); - SessionHandler sessionHandler = SessionHandler.create(sessionStore); + var sessionStore = LocalSessionStore.create(vertx); + var sessionHandler = SessionHandler.create(sessionStore); router.route().handler(sessionHandler); userContextHandler = new UserContextHandler(mongo); @@ -104,30 +103,27 @@ private Router createRouter() { } private SockJSHandler eventBusHandler() { - SockJSHandler handler = SockJSHandler.create(vertx); - BridgeOptions options = new BridgeOptions(); - PermittedOptions permitted = new PermittedOptions(); /* allow everything, we don't care for the demo */ - options.addOutboundPermitted(permitted); - handler.bridge(options); - return handler; + var sockJSHandler = SockJSHandler.create(vertx); + sockJSHandler.bridge( + new BridgeOptions().addOutboundPermitted(new PermittedOptions()) + ); + return sockJSHandler; } private static void staticHandler(Router router) { - StaticHandler staticHandler = StaticHandler.create(); - staticHandler.setCachingEnabled(false); - router.route("/assets/*").handler(staticHandler); + router.route("/assets/*").handler(StaticHandler + .create() + .setCachingEnabled(false)); } private void dynamicPages(Router router) { - HandlebarsTemplateEngine hbsEngine = HandlebarsTemplateEngine.create(vertx); - hbsEngine.setMaxCacheSize(0); /* no cache since we wan't hot-reload for templates */ - TemplateHandler templateHandler = TemplateHandler.create(hbsEngine); + var templateHandler = TemplateHandler.create(HandlebarsTemplateEngine.create(vertx)); router.get("/private/*").handler(userContextHandler::fromSession); - router.getWithRegex(".+\\.hbs").handler(context -> { - final Session session = context.session(); - context.data().put("userLogin", session.get("login")); /* in order to greet him */ - context.data().put("accessToken", session.get("access_token")); /* for api calls */ - context.next(); + router.getWithRegex(".+\\.hbs").handler(rc -> { + var session = rc.session(); + rc.data().put("userLogin", session.get("login")); /* in order to greet him */ + rc.data().put("accessToken", session.get("access_token")); /* for api calls */ + rc.next(); }); router.getWithRegex(".+\\.hbs").handler(templateHandler); } @@ -137,13 +133,13 @@ private Router apiRouter() { * TODO : provide authentication through the AuthService / AuthProvider instead of a custom api handler * TODO : every page except login must be private TODO : use FormLoginHandler for the actual login form TODO : use RedirectAuthHandler for "/private" */ - Router router = Router.router(vertx); + var router = Router.router(vertx); router.route().consumes(APPLICATION_JSON); router.route().produces(APPLICATION_JSON); router.route().handler(BodyHandler.create()); - router.route().handler(context -> { - context.response().headers().add(CONTENT_TYPE, APPLICATION_JSON); - context.next(); + router.route().handler(rc -> { + rc.response().headers().add(CONTENT_TYPE, APPLICATION_JSON); + rc.next(); }); /* login / user-related stuff : no token needed */ @@ -156,10 +152,10 @@ private Router apiRouter() { router.post("/feeds").handler(feedsApi::create); router.get("/feeds").handler(feedsApi::list); router.get("/feeds/:feedId").handler(feedsApi::retrieve); - router.put("/feeds/:feedId").handler(feedsApi::update); router.get("/feeds/:feedId/entries").handler(feedsApi::entries); router.delete("/feeds/:feedId").handler(feedsApi::delete); return router; } + } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..0559438 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,14 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + +