diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java index d77c3bf58c..7346571c8b 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -30,7 +30,8 @@ import io.agroal.api.security.SimplePassword; import io.agroal.pool.DataSource; import io.vertx.core.Vertx; -import io.vertx.ext.jdbc.JDBCClient; +import io.vertx.core.json.JsonObject; +import io.vertx.jdbcclient.JDBCPool; /** * Configuration properties for a JDBC service. @@ -175,14 +176,14 @@ public void setTableName(final String tableName) { } /** - * Creates a JDBC client for configuration properties. + * Creates a JDBC pool for configuration properties. * * @param vertx The vertx instance to use. * @param dataSourceProperties The properties. - * @return The client. + * @return The JDBC pool. * @throws IllegalArgumentException if any of the properties are invalid. */ - public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) { + public static JDBCPool dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) { log.info("Creating new SQL client for table: {}", dataSourceProperties.getTableName()); @@ -213,6 +214,11 @@ public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties data .principal(username) .credential(password))); - return JDBCClient.create(vertx, new DataSource(configuration.get())); + return JDBCPool.pool(vertx, new DataSource(configuration.get()), + new JsonObject() + .put("jdbcUrl", dataSourceProperties.getUrl()) + .put("username", Optional.ofNullable(dataSourceProperties.getUsername()).orElse("")) + .put("database", "") + .put("maxPoolSize", dataSourceProperties.getMaximumPoolSize())); } } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java index 7207672941..9ec8e7d0a3 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -31,9 +31,9 @@ import io.vertx.core.Future; import io.vertx.ext.healthchecks.HealthCheckHandler; import io.vertx.ext.healthchecks.Status; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; import io.vertx.ext.sql.UpdateResult; +import io.vertx.jdbcclient.JDBCPool; /** * An abstract JDBC based data store. @@ -45,7 +45,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl */ public static final String DEFAULT_CHECK_SQL = "SELECT 1"; - private final JDBCClient client; + private final JDBCPool client; private final Tracer tracer; private final ExpandedStatement checkSql; @@ -58,7 +58,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl * @param checkSql An optional SQL statement, which will be used to check if the connection to the * database is OK. It this value is empty, the default statement {@value #DEFAULT_CHECK_SQL} will be used. */ - public AbstractStore(final JDBCClient client, final Tracer tracer, final Optional checkSql) { + public AbstractStore(final JDBCPool client, final Tracer tracer, final Optional checkSql) { this.client = Objects.requireNonNull(client); this.tracer = Objects.requireNonNull(tracer); this.checkSql = checkSql.orElseGet(() -> Statement.statement(DEFAULT_CHECK_SQL)).expand(); diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java index 6efc2d71cc..d11645e1bb 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -37,9 +37,8 @@ import io.opentracing.log.Fields; import io.opentracing.tag.Tags; import io.vertx.core.Future; -import io.vertx.core.Promise; -import io.vertx.ext.sql.SQLClient; -import io.vertx.ext.sql.SQLConnection; +import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.SqlConnection; /** * SQL helper methods. @@ -84,54 +83,6 @@ public static Future translateException(final Throwable e) { } } - /** - * Enable auto-commit. - * - * @param tracer The tracer. - * @param context The span. - * @param connection The database connection to change. - * @param state The auto-commit state. - * @return A future for tracking the outcome. - */ - public static Future setAutoCommit(final Tracer tracer, final SpanContext context, final SQLConnection connection, final boolean state) { - final Span span = startSqlSpan(tracer, context, "set autocommit", builder -> { - builder.withTag("db.autocommit", state); - }); - final Promise promise = Promise.promise(); - connection.setAutoCommit(state, promise); - return finishSpan(promise.future().map(connection), span, null); - } - - /** - * Perform commit operation. - * - * @param tracer The tracer. - * @param context The span. - * @param connection The database connection to work on. - * @return A future for tracking the outcome. - */ - public static Future commit(final Tracer tracer, final SpanContext context, final SQLConnection connection) { - final Span span = startSqlSpan(tracer, context, "commit", null); - final Promise promise = Promise.promise(); - connection.commit(promise); - return finishSpan(promise.future().map(connection), span, null); - } - - /** - * Perform rollback operation. - * - * @param tracer The tracer. - * @param context The span. - * @param connection The database connection to work on. - * @return A future for tracking the outcome. - */ - public static Future rollback(final Tracer tracer, final SpanContext context, final SQLConnection connection) { - final Span span = startSqlSpan(tracer, context, "rollback", null); - final Promise promise = Promise.promise(); - connection.rollback(promise); - return finishSpan(promise.future().map(connection), span, null); - } - /** * Start a new span for an SQL operation. * @@ -286,40 +237,22 @@ public static boolean hasCauseOf(final Throwable e, final * @param The type of the result. * @return A future, tracking the outcome of the operation. */ - public static Future runTransactionally(final SQLClient client, final Tracer tracer, final SpanContext context, final BiFunction> function) { + public static Future runTransactionally(final JDBCPool client, final Tracer tracer, final SpanContext context, final BiFunction> function) { final Span span = startSqlSpan(tracer, context, "run transactionally", builder -> { }); - final Promise promise = Promise.promise(); - client.getConnection(promise); - - return promise.future() - + return client.withTransaction((connection) -> { // log open connection - .onSuccess(x -> { - final Map log = new HashMap<>(); - log.put(Fields.EVENT, "success"); - log.put(Fields.MESSAGE, "connection opened"); - span.log(log); - }) - - // disable autocommit, which is enabled by default - .flatMap(connection -> SQL.setAutoCommit(tracer, span.context(), connection, false) - - // run code - .flatMap(y -> function.apply(connection, span.context()) - - // commit or rollback ... return original result - .compose( - v -> SQL.commit(tracer, span.context(), connection).map(v), - x -> SQL.rollback(tracer, span.context(), connection).flatMap(unused -> Future.failedFuture(x)))) - - // close the connection - .onComplete(x -> connection.close())) - - .onComplete(x -> span.finish()); - + final Map log = new HashMap<>(); + log.put(Fields.EVENT, "success"); + log.put(Fields.MESSAGE, "connection opened"); + span.log(log); + + // execute function within a transaction + return function.apply(connection, span.context()); + }) + .onComplete(x -> span.finish()); } } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java index c312d33c49..e9b1a70624 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java @@ -36,14 +36,16 @@ import io.opentracing.SpanContext; import io.opentracing.Tracer; import io.opentracing.tag.Tags; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.json.JsonArray; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLOperations; import io.vertx.ext.sql.UpdateResult; +import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.Tuple; /** * An SQL statement, which can map named parameters to positional parameters. @@ -232,10 +234,6 @@ public Object[] getParameters() { return this.parameters; } - public JsonArray getParametersAsJson() { - return new JsonArray(Arrays.asList(this.parameters)); - } - @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -255,17 +253,6 @@ public ExpandedStatement trace(final Tracer tracer, final SpanContext spanContex return new ExpandedStatement(this.sql, this.parameters, tracer, spanContext); } - @FunctionalInterface - private interface Operation { - void run(String sql, JsonArray params, Handler> handler); - } - - private Future run(final Operation operation) { - final Promise promise = Promise.promise(); - operation.run(this.sql, getParametersAsJson(), promise); - return promise.future(); - } - /** * Start a new span for this SQL statement. * @return The newly created span. @@ -280,30 +267,85 @@ public Span startSqlSpan() { }); } + /** + * Execute this statement as a query. + * @param pool The connection pool to work on. + * @return A future tracking the query result. + */ + public Future query(final JDBCPool pool) { + final Span sqlSpan = startSqlSpan(); + return SQL.finishSpan(pool + .preparedQuery(this.sql) + .execute(Tuple.from(getParameters())) + .map(this::convertRowSetToResultSet), sqlSpan, (r, log) -> { + log.put("rows", r.getNumRows()); + }); + } + /** * Execute this statement as a query. * @param connection The connection to work on. * @return A future tracking the query result. */ - public Future query(final SQLOperations connection) { + public Future query(final SqlConnection connection) { final Span sqlSpan = startSqlSpan(); - return SQL.finishSpan(run(connection::queryWithParams), sqlSpan, (r, log) -> { + return SQL.finishSpan(connection + .preparedQuery(this.sql) + .execute(Tuple.from(getParameters())) + .map(this::convertRowSetToResultSet), sqlSpan, (r, log) -> { log.put("rows", r.getNumRows()); }); } + /** + * Execute this statement as a update. + * @param pool The connection pool to work on. + * @return A future tracking the update result. + */ + public Future update(final JDBCPool pool) { + final Span sqlSpan = startSqlSpan(); + return SQL.finishSpan(pool + .preparedQuery(this.sql) + .execute(Tuple.from(getParameters())) + .map(this::convertRowSetToUpdateResult), sqlSpan, (r, log) -> { + log.put("rows", r); + }); + } + /** * Execute this statement as a update. * @param connection The connection to work on. * @return A future tracking the update result. */ - public Future update(final SQLOperations connection) { + public Future update(final SqlConnection connection) { final Span sqlSpan = startSqlSpan(); - return SQL.finishSpan(run(connection::updateWithParams), sqlSpan, (r, log) -> { - log.put("rows", r.getUpdated()); + return SQL.finishSpan(connection + .preparedQuery(this.sql) + .execute(Tuple.from(getParameters())) + .map(this::convertRowSetToUpdateResult), sqlSpan, (r, log) -> { + log.put("rows", r); }); } + private ResultSet convertRowSetToResultSet(final RowSet rows) { + final List results = new ArrayList<>(); + rows.forEach(row -> { + final JsonArray values = new JsonArray(); + for (int index = 0; index < row.size(); ++index) { + values.add(row.getValue(index)); + } + results.add(values); + }); + return new ResultSet() + .setColumnNames(rows.columnsNames()) + .setResults(results); + } + + private UpdateResult convertRowSetToUpdateResult(final SqlResult> sqlResult) { + return new UpdateResult() + .setUpdated(sqlResult.rowCount()); + } + } } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java index 7db6a3c195..63338578ee 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -26,9 +26,9 @@ import io.opentracing.SpanContext; import io.opentracing.Tracer; import io.vertx.core.Future; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLOperations; +import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.SqlConnection; /** * An abstract base for implementing a device registration store. @@ -37,7 +37,7 @@ public abstract class AbstractDeviceStore extends AbstractStore { private static final Logger log = LoggerFactory.getLogger(AbstractDeviceStore.class); - protected final JDBCClient client; + protected final JDBCPool client; protected final Tracer tracer; private final Statement readRegistrationStatement; @@ -49,7 +49,7 @@ public abstract class AbstractDeviceStore extends AbstractStore { * @param tracer The tracer to use. * @param cfg The SQL statement configuration. */ - public AbstractDeviceStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public AbstractDeviceStore(final JDBCPool client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg.getStatement("checkConnection")); this.client = client; @@ -69,14 +69,14 @@ public AbstractDeviceStore(final JDBCClient client, final Tracer tracer, final S * This will execute the {@code readRegistration} statement and simply * return the result set unprocessed. * - * @param operations The SQL operations to use. + * @param connection The SQL connection to use. * @param key The key to the device entry. * @param span The span to contribute to. * * @return The future, tracking the outcome of the operation. */ - protected Future readDevice(final SQLOperations operations, final DeviceKey key, final Span span) { - return read(operations, key, this.readRegistrationStatement, span.context()); + protected Future readDevice(final SqlConnection connection, final DeviceKey key, final Span span) { + return read(connection, key, this.readRegistrationStatement, span.context()); } /** @@ -86,15 +86,15 @@ protected Future readDevice(final SQLOperations operations, final Dev * return the result set unprocessed. The statement must accept the named * parameters {@code tenant_id} and {@code device_id}. * - * @param operations The SQL operations to use. + * @param connection The SQL connection to use. * @param key The key to the device entry. * @param statement The statement to execute. * @param spanContext The span to contribute to. * * @return The future, tracking the outcome of the operation. */ - protected Future read(final SQLOperations operations, final DeviceKey key, final Statement statement, final SpanContext spanContext) { - return read(operations, key, Optional.empty(), statement, spanContext); + protected Future read(final SqlConnection connection, final DeviceKey key, final Statement statement, final SpanContext spanContext) { + return read(connection, key, Optional.empty(), statement, spanContext); } /** @@ -104,7 +104,7 @@ protected Future read(final SQLOperations operations, final DeviceKey * return the result set unprocessed. The statement must accept the named * parameters {@code tenant_id}, {@code device_id} and {@code expected_version} (if not empty). * - * @param operations The SQL operations to use. + * @param connection The SQL connection to use. * @param key The key to the device entry. * @param statement The statement to execute. * @param resourceVersion An optional resource version to read. @@ -112,7 +112,7 @@ protected Future read(final SQLOperations operations, final DeviceKey * * @return The future, tracking the outcome of the operation. */ - protected Future read(final SQLOperations operations, final DeviceKey key, final Optional resourceVersion, final Statement statement, final SpanContext spanContext) { + protected Future read(final SqlConnection connection, final DeviceKey key, final Optional resourceVersion, final Statement statement, final SpanContext spanContext) { final var expanded = statement.expand(params -> { params.put("tenant_id", key.getTenantId()); @@ -124,7 +124,7 @@ protected Future read(final SQLOperations operations, final DeviceKey return expanded .trace(this.tracer, spanContext) - .query(operations); + .query(connection); } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java index 54ddff0b67..174f38a382 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -37,8 +37,7 @@ import io.opentracing.Tracer; import io.vertx.core.Future; import io.vertx.core.json.Json; -import io.vertx.ext.jdbc.JDBCClient; -import io.vertx.ext.sql.ResultSet; +import io.vertx.jdbcclient.JDBCPool; /** * A data store for devices and credentials, based on a table data model. @@ -59,7 +58,7 @@ public class TableAdapterStore extends AbstractDeviceStore { * @param cfg The SQL statement configuration. * @param dialect Database type, from the JDBC URL scheme */ - public TableAdapterStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg, final String dialect) { + public TableAdapterStore(final JDBCPool client, final Tracer tracer, final StatementConfiguration cfg, final String dialect) { super(client, tracer, cfg); this.dialect = dialect; cfg.dump(log); @@ -79,25 +78,11 @@ public TableAdapterStore(final JDBCClient client, final Tracer tracer, final Sta } - - /** - * Read a device using {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)} and the - * current SQL client. - * - * @param key The key of the device to read. - * @param span The span to contribute to. - * - * @return The result from {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)}. - */ - protected Future readDevice(final DeviceKey key, final Span span) { - return readDevice(this.client, key, span); - } - /** * Reads the device data. *

* This reads the device data using - * {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)} and + * {@link #readDevice(io.vertx.sqlclient.SqlConnection, DeviceKey, Span)} and * transforms the plain result into a {@link DeviceReadResult}. *

* If now rows where found, the result will be empty. If more than one row is found, @@ -118,24 +103,27 @@ public Future> readDevice(final DeviceKey key, final .withTag(TracingHelper.TAG_DEVICE_ID, key.getDeviceId()) .start(); - return readDevice(this.client, key, span) - - .>flatMap(r -> { - final var entries = r.getRows(true); - switch (entries.size()) { - case 0: - return Future.succeededFuture(Optional.empty()); - case 1: - final var entry = entries.get(0); - final var device = Json.decodeValue(entry.getString("data"), Device.class); - final var version = Optional.ofNullable(entry.getString("version")); - return Future.succeededFuture(Optional.of(new DeviceReadResult(device, version))); - default: - return Future.failedFuture(new IllegalStateException("Found multiple entries for a single device")); - } - }) - - .onComplete(x -> span.finish()); + return this.client.getConnection().compose(connection -> { + return readDevice(connection, key, span) + + .>flatMap(r -> { + final var entries = r.getRows(true); + switch (entries.size()) { + case 0: + return Future.succeededFuture(Optional.empty()); + case 1: + final var entry = entries.get(0); + final var device = Json.decodeValue(entry.getString("data"), Device.class); + final var version = Optional.ofNullable(entry.getString("version")); + return Future.succeededFuture(Optional.of(new DeviceReadResult(device, version))); + default: + return Future.failedFuture(new IllegalStateException("Found multiple entries for a single device")); + } + }) + + .onComplete(x -> connection.close()) + .onComplete(x -> span.finish()); + }); } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java index 66ad853c4b..c551e9aef2 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -56,11 +56,10 @@ import io.vertx.core.Promise; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLConnection; -import io.vertx.ext.sql.SQLOperations; import io.vertx.ext.sql.UpdateResult; +import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.SqlConnection; /** * A data store for devices and credentials, based on a table data model. @@ -114,7 +113,7 @@ public class TableManagementStore extends AbstractDeviceStore { * @param tracer The tracer to use. * @param cfg The SQL statement configuration. */ - public TableManagementStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public TableManagementStore(final JDBCPool client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg); cfg.dump(log); @@ -274,7 +273,7 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final * @param span The span to contribute to. * @return A future tracking the outcome of the operation. */ - protected Future readDeviceForUpdate(final SQLConnection connection, final DeviceKey key, final SpanContext span) { + protected Future readDeviceForUpdate(final SqlConnection connection, final DeviceKey key, final SpanContext span) { return read(connection, key, Optional.empty(), this.readForUpdateStatement, span); } @@ -349,7 +348,7 @@ public Future> createDevice( } private Future createGroups( - final SQLConnection connection, + final SqlConnection connection, final DeviceKey key, final Set memberOf, final SpanContext context) { @@ -375,7 +374,7 @@ private Future createGroups( } - private Future deleteGroups(final SQLConnection connection, + private Future deleteGroups(final SqlConnection connection, final DeviceKey key, final SpanContext context) { @@ -442,10 +441,13 @@ protected Future updateJsonField( .update(this.client); // process result, check optimistic lock - return checkOptimisticLock( - result, span, - resourceVersion, - checkSpan -> readDevice(this.client, key, checkSpan)); + return this.client.getConnection().compose(connection -> { + return checkOptimisticLock( + result, span, + resourceVersion, + checkSpan -> readDevice(connection, key, checkSpan)) + .onComplete(x -> connection.close()); + }); } @@ -530,7 +532,7 @@ public Future> updateDevice( * Reads the device data. *

* This reads the device data using - * {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)} and + * {@link #readDevice(io.vertx.sqlclient.SqlConnection, DeviceKey, Span)} and * transforms the plain result into a {@link DeviceReadResult}. *

* If now rows where found, the result will be empty. If more than one row is found, @@ -550,23 +552,26 @@ public Future> readDevice(final DeviceKey key, final .withTag(TracingHelper.TAG_DEVICE_ID, key.getDeviceId()) .start(); - return readDevice(this.client, key, span) + return this.client.getConnection().compose(connection -> { + return readDevice(connection, key, span) - .>flatMap(r -> { - final var entries = r.getRows(true); - switch (entries.size()) { - case 0: - return Future.succeededFuture(Optional.empty()); - case 1: - final var entry = entries.get(0); - final JdbcBasedDeviceDto deviceDto = JdbcBasedDeviceDto.forRead(key.getTenantId(), key.getDeviceId(), entry); - return Future.succeededFuture(Optional.of(new DeviceReadResult(deviceDto.getDeviceWithStatus(), Optional.of(deviceDto.getVersion())))); - default: - return Future.failedFuture(new IllegalStateException("Found multiple entries for a single device")); - } - }) + .>flatMap(r -> { + final var entries = r.getRows(true); + switch (entries.size()) { + case 0: + return Future.succeededFuture(Optional.empty()); + case 1: + final var value = entries.get(0); + final JdbcBasedDeviceDto deviceDto = JdbcBasedDeviceDto.forRead(key.getTenantId(), key.getDeviceId(), value); + return Future.succeededFuture(Optional.of(new DeviceReadResult(deviceDto.getDeviceWithStatus(), Optional.of(deviceDto.getVersion())))); + default: + return Future.failedFuture(new IllegalStateException("Found multiple entries for a single device")); + } + }) - .onComplete(x -> span.finish()); + .onComplete(x -> connection.close()) + .onComplete(x -> span.finish()); + }); } @@ -609,15 +614,18 @@ public Future deleteDevice( log.debug("delete - statement: {}", expanded); - final var result = expanded - .trace(this.tracer, span.context()) - .update(this.client); - - return checkOptimisticLock( - result, span, - resourceVersion, - checkSpan -> readDevice(this.client, key, checkSpan)) - .onComplete(x -> span.finish()); + return this.client.getConnection().compose(connection -> { + final var result = expanded + .trace(this.tracer, span.context()) + .update(connection); + + return checkOptimisticLock( + result, span, + resourceVersion, + checkSpan -> readDevice(connection, key, checkSpan)) + .onComplete(x -> connection.close()) + .onComplete(x -> span.finish()); + }); } @@ -650,7 +658,7 @@ public Future dropTenant(final String tenantId, final SpanContext /** * Gets the number of devices that are registered for a tenant. * - * @param operations The SQL operations instance to use. + * @param connection The SQL connection instance to use. * @param tenantId The tenant to count devices for. * @param spanContext The span to contribute to. * @param countStatement The count statement to use. @@ -659,7 +667,7 @@ public Future dropTenant(final String tenantId, final SpanContext * @return A future tracking the outcome of the operation. * @throws NullPointerException if tenant is {@code null}. */ - public Future getDeviceCount(final SQLOperations operations, final String tenantId, final SpanContext spanContext, final Statement countStatement, final String field, final String value) { + public Future getDeviceCount(final SqlConnection connection, final String tenantId, final SpanContext spanContext, final Statement countStatement, final String field, final String value) { Objects.requireNonNull(tenantId); @@ -677,7 +685,7 @@ public Future getDeviceCount(final SQLOperations operations, final Stri return expanded .trace(this.tracer, span.context()) - .query(operations) + .query(connection) .map(r -> { final var entries = r.getRows(true); switch (entries.size()) { @@ -812,7 +820,7 @@ public Future> setCredentials( private Future getCredentialsDto( final DeviceKey key, - final SQLConnection connection, + final SqlConnection connection, final Span span) { return readCredentialsStatement @@ -907,11 +915,7 @@ public Future> getCredentials(final DeviceKey ke map.put(DEVICE_ID, key.getDeviceId()); }); - final Promise promise = Promise.promise(); - this.client.getConnection(promise); - - return promise.future() - + return this.client.getConnection() .compose(connection -> readDevice(connection, key, span) // check if we got back a result, if not this will abort early @@ -1009,7 +1013,9 @@ public Future> findDevices(final String tenantId, fin .withTag(TracingHelper.TAG_TENANT_ID, tenantId) .start(); - final Future deviceCountFuture = getDeviceCount(this.client, tenantId, span.context(), countStatement, field, value); + final Future deviceCountFuture = this.client.getConnection() + .compose(connection -> getDeviceCount(connection, tenantId, span.context(), countStatement, field, value) + .onComplete(x -> connection.close())); return deviceCountFuture .compose(count -> expanded.trace(this.tracer, span.context()).query(this.client)) diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java index 5ff99ccc34..317c663e50 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -35,9 +35,9 @@ import io.vertx.core.Future; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLOperations; +import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.SqlConnection; /** * A data store for tenant information. @@ -49,7 +49,7 @@ public abstract class AbstractTenantStore extends AbstractStore { private static final Logger log = LoggerFactory.getLogger(AbstractTenantStore.class); - protected final JDBCClient client; + protected final JDBCPool client; protected final Tracer tracer; private final Statement readStatement; @@ -62,7 +62,7 @@ public abstract class AbstractTenantStore extends AbstractStore { * @param tracer The tracer to use. * @param cfg The statement configuration to use. */ - public AbstractTenantStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public AbstractTenantStore(final JDBCPool client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg.getStatement("checkConnection")); cfg.dump(log); @@ -118,12 +118,12 @@ public Future> readTenant(final String id, final Span /** * Read a tenant, using the provided statement. * - * @param operations The operations to use. + * @param connection The connection to use. * @param expanded The statement to use. * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future> readTenantBy(final SQLOperations operations, final ExpandedStatement expanded, final SpanContext spanContext) { + protected Future> readTenantBy(final SqlConnection connection, final ExpandedStatement expanded, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "read tenant by", getClass().getSimpleName()) .start(); @@ -131,7 +131,7 @@ protected Future> readTenantBy(final SQLOperations op return expanded .trace(this.tracer, span.context()) - .query(operations) + .query(connection) .>flatMap(r -> { final var entries = r.getRows(true); @@ -160,7 +160,7 @@ protected Future> readTenantBy(final SQLOperations op .flatMap(result -> { if (result.isPresent()) { - return fillTrustAnchors(operations, result.get(), span.context()) + return fillTrustAnchors(connection, result.get(), span.context()) .map(Optional::ofNullable); } else { return Future.succeededFuture(result); @@ -177,12 +177,12 @@ protected Future> readTenantBy(final SQLOperations op *

* The result set will contain zero or one rows. * - * @param operations The operations to use. + * @param connection The connection to use. * @param id The ID of the tenant to read. * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future readTenantEntryById(final SQLOperations operations, final String id, final SpanContext spanContext) { + protected Future readTenantEntryById(final SqlConnection connection, final String id, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "read tenant entry", getClass().getSimpleName()) .withTag(TracingHelper.TAG_TENANT_ID, id) @@ -194,7 +194,7 @@ protected Future readTenantEntryById(final SQLOperations operations, return expanded .trace(this.tracer, span.context()) - .query(operations) + .query(connection) .onComplete(x -> span.finish()); } @@ -204,12 +204,12 @@ protected Future readTenantEntryById(final SQLOperations operations, *

* The result set will contain zero or more rows. * - * @param operations The operations to use. + * @param connection The connection to use. * @param id The ID of the tenant to read the trust anchors for. * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future readTenantTrustAnchors(final SQLOperations operations, final String id, final SpanContext spanContext) { + protected Future readTenantTrustAnchors(final SqlConnection connection, final String id, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "populate trust anchors", getClass().getSimpleName()) .withTag(TracingHelper.TAG_TENANT_ID, id) @@ -223,7 +223,7 @@ protected Future readTenantTrustAnchors(final SQLOperations operation return expanded .trace(this.tracer, span.context()) - .query(operations) + .query(connection) .onComplete(x -> span.finish()); } @@ -231,18 +231,18 @@ protected Future readTenantTrustAnchors(final SQLOperations operation /** * Fill the trust anchors for an already loaded tenant. * - * @param operations The SQL operations to use. + * @param connection The SQL connection to use. * @param tenant The tenant read result to populate. * @param spanContext The span to contribute to. * @return The future, tracking the outcome of the operation. */ protected Future fillTrustAnchors( - final SQLOperations operations, + final SqlConnection connection, final TenantReadResult tenant, final SpanContext spanContext ) { - return readTenantTrustAnchors(operations, tenant.getId(), spanContext) + return readTenantTrustAnchors(connection, tenant.getId(), spanContext) .map(result -> { tenant .getTenant() diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java index c86f58b427..e5183daa8d 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -27,8 +27,7 @@ import io.opentracing.Tracer; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; - +import io.vertx.jdbcclient.JDBCPool; /** * A data store for tenant information. @@ -44,7 +43,7 @@ public class AdapterStore extends AbstractTenantStore { * @param tracer The tracer to use. * @param cfg The statement configuration to use. */ - public AdapterStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public AdapterStore(final JDBCPool client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg); this.readByTrustAnchorStatement = cfg @@ -99,8 +98,9 @@ private Future> readTenantByTrustAnchor(final String map.put("subject_dn", subjectDn); }); - return readTenantBy(this.client, expanded, spanContext); - + return this.client.getConnection() + .compose(connection -> readTenantBy(connection, expanded, spanContext) + .onComplete(r -> connection.close())); } /** diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java index 5627b885b9..2e8a07ffb5 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2020 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -10,7 +10,9 @@ * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ + package org.eclipse.hono.service.base.jdbc.store.tenant; + import java.io.IOException; import java.net.HttpURLConnection; import java.util.ArrayList; @@ -41,10 +43,9 @@ import io.vertx.core.Promise; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; -import io.vertx.ext.sql.SQLConnection; -import io.vertx.ext.sql.SQLOperations; import io.vertx.ext.sql.UpdateResult; +import io.vertx.jdbcclient.JDBCPool; +import io.vertx.sqlclient.SqlConnection; /** * A data store for tenant management information. @@ -75,7 +76,7 @@ public class ManagementStore extends AbstractTenantStore { * @param tracer The tracer to use. * @param cfg The statement configuration to use. */ - public ManagementStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public ManagementStore(final JDBCPool client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg); @@ -178,11 +179,11 @@ private String tenantToJson(final Tenant tenant) { public Future getTenantCount() { final Promise result = Promise.promise(); - this.client.querySingle(countStatementSql, rs -> { + this.client.query(countStatementSql).execute(rs -> { if (rs.failed()) { result.fail(new IllegalStateException("failed to query number of tenants", rs.cause())); } else { - result.complete(rs.result().getInteger(0)); + result.complete(rs.result().iterator().next().getInteger(0)); } }); return result.future(); @@ -234,7 +235,7 @@ public Future> create(final String tenantId, final Tenant tenant } - private Future deleteAllTrustAnchors(final SQLConnection connection, final String tenantId, final Span span) { + private Future deleteAllTrustAnchors(final SqlConnection connection, final String tenantId, final Span span) { return this.deleteAllTrustAnchorsStatement @@ -246,7 +247,7 @@ private Future deleteAllTrustAnchors(final SQLConnection connection, final } - private Future insertAllTrustAnchors(final SQLConnection connection, final String tenantId, final Tenant tenant, final Span span) { + private Future insertAllTrustAnchors(final SqlConnection connection, final String tenantId, final Tenant tenant, final Span span) { if (tenant.getTrustedCertificateAuthorities() == null || tenant.getTrustedCertificateAuthorities().isEmpty()) { return Future.succeededFuture(); @@ -331,15 +332,18 @@ public Future delete(final String tenantId, final Optional log.debug("delete - statement: {}", expanded); - final var result = expanded - .trace(this.tracer, span.context()) - .update(this.client); - - return checkOptimisticLock( - result, span, - resourceVersion, - checkSpan -> readTenantEntryById(this.client, tenantId, checkSpan.context())) - .onComplete(x -> span.finish()); + return this.client.getConnection().compose(connection -> { + final var result = expanded + .trace(this.tracer, span.context()) + .update(connection); + + return checkOptimisticLock( + result, span, + resourceVersion, + checkSpan -> readTenantEntryById(connection, tenantId, checkSpan.context())) + .onComplete(x -> connection.close()) + .onComplete(x -> span.finish()); + }); } @@ -413,7 +417,7 @@ public Future> update(final String tenantId, final Tenant tenant * {@link org.eclipse.hono.service.base.jdbc.store.OptimisticLockingException} is reported via the future. If the * entity was not found, an update result with zero changes is returned. * - * @param operations The SQL instance to use. + * @param connection The SQL connection to use. * @param tenantId The tenant ID to update. * @param statement The update statement to use. * @param jsonValue The JSON value of the data field. @@ -423,7 +427,7 @@ public Future> update(final String tenantId, final Tenant tenant * @return The future, tracking the outcome of the operation. */ protected Future updateJsonField( - final SQLOperations operations, + final SqlConnection connection, final String tenantId, final Statement statement, final String jsonValue, @@ -443,13 +447,13 @@ protected Future updateJsonField( // execute update final var result = expanded .trace(this.tracer, span.context()) - .update(operations); + .update(connection); // process result, check optimistic lock return checkOptimisticLock( result, span, resourceVersion, - checkSpan -> readTenantEntryById(operations, tenantId, checkSpan.context())); + checkSpan -> readTenantEntryById(connection, tenantId, checkSpan.context())); } /** diff --git a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java index 4182e4d25b..8b3e4478da 100644 --- a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java +++ b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java @@ -31,7 +31,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.ext.jdbc.JDBCClient; +import io.vertx.jdbcclient.JDBCPool; /** * Create the expected database schema if it does not exist from SQL script bundled with the classpath. @@ -102,17 +102,17 @@ private Future loadAndRunScript(final JdbcProperties jdbcProperties, final private Future runScript(final JdbcProperties jdbcProperties, final String script, final SpanContext ctx) { - final JDBCClient jdbcClient = JdbcProperties.dataSource(vertx, jdbcProperties); + final JDBCPool jdbcPool = JdbcProperties.dataSource(vertx, jdbcProperties); - final Promise clientCloseTracker = Promise.promise(); - SQL.runTransactionally(jdbcClient, tracer, ctx, + final Promise poolCloseTracker = Promise.promise(); + SQL.runTransactionally(jdbcPool, tracer, ctx, (connection, context) -> { return Optional.ofNullable(Statement.statement(script)) .map(Statement::expand) .map(stmt -> { log.debug("creating database schema in [{}] using script: {}", jdbcProperties.getUrl(), stmt); return stmt - .query(jdbcClient) + .query(connection) .recover(SQL::translateException); }) .orElseGet(() -> { @@ -121,8 +121,8 @@ private Future runScript(final JdbcProperties jdbcProperties, final String return Future.failedFuture("cannot create database schema using script"); }); }) - .onComplete(ar -> jdbcClient.close(clientCloseTracker)); - return clientCloseTracker.future(); + .onComplete(ar -> jdbcPool.close(poolCloseTracker)); + return poolCloseTracker.future(); } }