Skip to content

Commit

Permalink
Improve rxjava 3 examples
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 6, 2024
1 parent 9d38d0e commit 1821f22
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private Completable insertAndFind() {
new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
);

Single<List<JsonObject>> listSingle = mongo
Single<List<JsonObject>> single = mongo
.rxCreateCollection("users")
.andThen(
// After collection is created we insert each document
Expand All @@ -59,6 +59,6 @@ private Completable insertAndFind() {
.doOnSuccess(results -> {
System.out.println("Results " + results);
});
return listSingle.ignoreElement();
return single.ignoreElement();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public Completable rxStart() {
.toMaybe());

// Connect to the database
return resa.doOnSuccess(rowSet -> {
// Subscribe to the final result
System.out.println("Results:");
rowSet.forEach(row -> {
System.out.println(row.toJson());
});
}).ignoreElement();
return resa
.doOnSuccess(rowSet -> {
// Subscribe to the final result
System.out.println("Results:");
rowSet.forEach(row -> {
System.out.println(row.toJson());
});
}).ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.vertx.example.rxjava3.database.mongo;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.vertx.core.json.JsonObject;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.rxjava3.core.AbstractVerticle;
import io.vertx.rxjava3.ext.mongo.MongoClient;

import java.util.List;

/*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
Expand All @@ -18,7 +22,7 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {

JsonObject config = new JsonObject()
.put("connection_string", "mongodb://localhost:27018")
Expand All @@ -27,17 +31,17 @@ public void start() throws Exception {
// Create the client
mongo = MongoClient.createShared(vertx, config);

insertAndFind();
return insertAndFind();
}

private void insertAndFind() {
private Completable insertAndFind() {
// Documents to insert
Flowable<JsonObject> documents = Flowable.just(
new JsonObject().put("username", "temporalfox").put("firstname", "Julien").put("password", "bilto"),
new JsonObject().put("username", "purplefox").put("firstname", "Tim").put("password", "wibble")
);

mongo
Maybe<List<JsonObject>> maybe = mongo
.rxCreateCollection("users")
.andThen(
// After collection is created we insert each document
Expand All @@ -52,11 +56,9 @@ private void insertAndFind() {
System.out.println("Insertions done");
return mongo.rxFind("users", new JsonObject());
})
.subscribe(results -> {
.doOnSuccess(results -> {
System.out.println("Results " + results);
}, error -> {
System.out.println("Err");
error.printStackTrace();
});
return maybe.ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.example.rxjava3.database.sqlclient;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import io.vertx.jdbcclient.JDBCConnectOptions;
import io.vertx.launcher.application.VertxApplication;
Expand All @@ -20,7 +21,7 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {

Pool pool = JDBCPool.pool(vertx, new JDBCConnectOptions().setJdbcUrl("jdbc:hsqldb:mem:test?shutdown=true"), new PoolOptions());

Expand All @@ -33,15 +34,14 @@ public void start() throws Exception {
.toMaybe());

// Connect to the database
resa.subscribe(rowSet -> {
// Subscribe to the final result
System.out.println("Results:");
rowSet.forEach(row -> {
System.out.println(row.toJson());
});
}, err -> {
System.out.println("Database problem");
err.printStackTrace();
});
return resa
.doOnSuccess(rowSet -> {
// Subscribe to the final result
System.out.println("Results:");
rowSet.forEach(row -> {
System.out.println(row.toJson());
});
})
.ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.vertx.example.rxjava3.database.sqlclient;

import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Completable;
import io.vertx.jdbcclient.JDBCConnectOptions;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.rxjava3.core.AbstractVerticle;
Expand All @@ -9,7 +9,6 @@
import io.vertx.sqlclient.PoolOptions;

import java.util.Arrays;
import java.util.function.Function;

/*
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
Expand All @@ -21,7 +20,7 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {

String sql = "CREATE TABLE colors (" +
"id INTEGER GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, " +
Expand All @@ -31,7 +30,8 @@ public void start() throws Exception {
Pool pool = JDBCPool.pool(vertx, new JDBCConnectOptions().setJdbcUrl("jdbc:hsqldb:mem:test?shutdown=true"), new PoolOptions());

// Connect to the database
pool.rxWithTransaction((Function<SqlConnection, Maybe<RowSet<Row>>>) client -> client
return pool
.rxWithTransaction(client -> client
// Create table
.query(sql).rxExecute()
// Insert colors
Expand All @@ -41,11 +41,12 @@ public void start() throws Exception {
// Get colors if all succeeded
.flatMap(r -> client.query("SELECT * FROM colors").rxExecute())
.toMaybe())// Subscribe to get the final result
.subscribe(rowSet -> {
.doOnSuccess(rowSet -> {
System.out.println("Results:");
rowSet.forEach(row -> {
System.out.println(row.toJson());
});
}, Throwable::printStackTrace);
})
.ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.example.rxjava3.http.client.reduce;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.vertx.core.buffer.Buffer;
Expand All @@ -17,9 +18,12 @@ public static void main(String[] args) {
VertxApplication.main(new String[]{Client.class.getName()});
}

private HttpClient client;

@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
public Completable rxStart() {

client = vertx.createHttpClient();

Maybe<String> maybe = client.rxRequest(HttpMethod.GET, 8080, "localhost", "/")

Expand All @@ -44,6 +48,8 @@ public void start() throws Exception {
.map(buffer -> buffer.toString("UTF-8"))
);

maybe.subscribe(data -> System.out.println("Server content " + data), Throwable::printStackTrace);
return maybe
.doOnSuccess(data -> System.out.println("Server content " + data))
.ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.example.rxjava3.http.client.simple;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand All @@ -17,7 +18,7 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {
HttpClient client = vertx.createHttpClient();
Single<Buffer> single = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
Expand All @@ -30,6 +31,8 @@ public void start() throws Exception {
}
return resp.rxBody();
}));
single.subscribe(data -> System.out.println("Server content " + data.toString("UTF-8")), Throwable::printStackTrace);
return single
.doOnSuccess(data -> System.out.println("Server content " + data.toString("UTF-8")))
.ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.example.rxjava3.http.client.unmarshalling;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.Json;
Expand All @@ -24,9 +25,12 @@ static class Data {

}

private HttpClient client;

@Override
public void start() throws Exception {
HttpClient client = vertx.createHttpClient();
public Completable rxStart() {

client = vertx.createHttpClient();

Flowable<Data> flowable = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/")
Expand All @@ -37,6 +41,8 @@ public void start() throws Exception {
.map(buffer -> Json.decodeValue(buffer, Data.class))
);

flowable.subscribe(data -> System.out.println("Got response " + data.message), Throwable::printStackTrace);
return flowable
.doOnNext(data -> System.out.println("Got response " + data.message))
.ignoreElements();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.example.rxjava3.http.client.zip;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand All @@ -19,7 +20,7 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {
HttpClient client = vertx.createHttpClient();

// Send two requests
Expand All @@ -37,8 +38,9 @@ public void start() throws Exception {
.map(Buffer::toJsonObject));

// Combine the responses with the zip into a single response
req1.zipWith(req2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2)).subscribe(json -> {
System.out.println("Got combined result " + json);
}, Throwable::printStackTrace);
return req1
.zipWith(req2, (b1, b2) -> new JsonObject().put("req1", b1).put("req2", b2))
.doOnSuccess(json -> System.out.println("Got combined result " + json))
.ignoreElement();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.example.rxjava3.http.server.echo;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand All @@ -18,20 +19,22 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {
HttpClient client = vertx.createHttpClient();

Flowable<Buffer> payload = Flowable.just(Buffer.buffer("hello", "UTF-8"));

MultiMap headers = MultiMap.caseInsensitiveMultiMap().add("Content-Type", "text/plain");

client.rxRequest(HttpMethod.PUT, 8080, "localhost", "/")
return client
.rxRequest(HttpMethod.PUT, 8080, "localhost", "/")
.flatMap(req -> {
req.headers().addAll(headers);
return req
.rxSend(payload)
.flatMap(resp -> resp.rxBody());
})
.subscribe(buf -> System.out.println(buf.toString("UTF-8")), Throwable::printStackTrace);
.doOnSuccess(buf -> System.out.println(buf.toString("UTF-8")))
.ignoreElement();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.vertx.example.rxjava3.net.greeter;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import io.vertx.launcher.application.VertxApplication;
import io.vertx.rxjava3.core.AbstractVerticle;
Expand All @@ -34,28 +35,27 @@ public static void main(String[] args) {
}

@Override
public void start() throws Exception {
public Completable rxStart() {
Single<NetSocket> sub = vertx.createNetClient().rxConnect(1234, "localhost");
sub.subscribe(socket -> {

return sub.flatMapCompletable(socket -> {
RecordParser parser = RecordParser.newDelimited("\n", socket.toFlowable());

parser
.toFlowable()
.map(buffer -> buffer.toString("UTF-8"))
.subscribe(greeting -> System.out.println("Net client receiving: " + greeting), t -> {
t.printStackTrace();
socket.close();
}, socket::close);

// Now send some data
Stream.of("John", "Joe", "Lisa", "Bill").forEach(name -> {
System.out.println("Net client sending: " + name);
socket.write(name);
socket.write("\n");
});
}, err -> {
System.out.println("Failed to connect " + err);

// Tell the server we are done
socket.write("\n");

return parser
.toFlowable()
.map(buffer -> buffer.toString("UTF-8"))
.doOnNext(greeting -> System.out.println("Net client receiving: " + greeting))
.ignoreElements();
});
}
}
Loading

0 comments on commit 1821f22

Please sign in to comment.