Skip to content

Commit

Permalink
Upgrade to support H2 2.0.
Browse files Browse the repository at this point in the history
[resolves #203]

Signed-off-by: Greg L. Turnquist <[email protected]>
  • Loading branch information
gregturn committed Dec 6, 2021
1 parent a9b8f5d commit dd279a6
Show file tree
Hide file tree
Showing 66 changed files with 389 additions and 357 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<properties>
<assertj.version>3.20.2</assertj.version>
<h2.version>1.4.200</h2.version>
<h2.version>2.0.202</h2.version>

This comment has been minimized.

Copy link
@abelsromero

abelsromero Dec 17, 2021

@gregturn it seems the line with the newer version was added without removing the previous one?

This comment has been minimized.

Copy link
@gregturn

gregturn Jan 12, 2022

Author Contributor

Thanks @abelsromero. Resolved via a68533e.

<java.version>1.8</java.version>
<jmh.version>1.32</jmh.version>
<jsr305.version>3.0.2</jsr305.version>
Expand All @@ -44,7 +45,7 @@
<mockito.version>3.11.2</mockito.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<r2dbc-spi.version>0.9.0.M2</r2dbc-spi.version>
<r2dbc-spi.version>0.9.0.RELEASE</r2dbc-spi.version>
<reactor.version>Dysprosium-SR22</reactor.version>
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
</properties>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/r2dbc/h2/H2Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public Flux<H2Result> execute() {

ResultWithGeneratedKeys result = this.client.update(command, false);
CommandUtil.clearForReuse(command);
return H2Result.toResult(this.codecs, result.getUpdateCount());
int updatedCountInt = Long.valueOf(result.getUpdateCount()).intValue();
return H2Result.toResult(this.codecs, updatedCountInt);
}
} catch (DbException e) {
throw H2DatabaseExceptionFactory.convert(e);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/r2dbc/h2/H2ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private static ConnectionInfo getConnectionInfo(H2ConnectionConfiguration config
Properties properties = new Properties();
properties.putAll(configuration.getProperties());

return new ConnectionInfo(sb.toString(), properties);
return new ConnectionInfo(sb.toString(), properties, null, null);
} catch (DbException e) {
throw H2DatabaseExceptionFactory.convert(e);
}
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/r2dbc/h2/H2Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public final class H2Result implements Result {

private final Mono<Integer> rowsUpdated;

private final Flux<Segment> segments;
private final Flux<? extends Segment> segments;

H2Result(H2RowMetadata rowMetadata, Flux<H2Row> rows, Mono<Integer> rowsUpdated, Flux<Segment> segments) {
this.rowMetadata = Assert.requireNonNull(rowMetadata, "rowMetadata must not be null");
H2Result(H2RowMetadata rowMetadata, Flux<H2Row> rows, Mono<Integer> rowsUpdated, Flux<? extends Segment> segments) {
this.rowMetadata = rowMetadata;
this.rows = Assert.requireNonNull(rows, "rows must not be null");
this.rowsUpdated = Assert.requireNonNull(rowsUpdated, "rowsUpdated must not be null");
this.segments = Assert.requireNonNull(segments, "segments must not be null");
Expand All @@ -70,7 +70,7 @@ public Mono<Integer> getRowsUpdated() {
public H2Result filter(Predicate<Segment> filter) {
Assert.requireNonNull(filter, "predicate must not be null");

Flux<Segment> filteredSegments = this.segments.filter(filter::test);
Flux<? extends Segment> filteredSegments = this.segments.filter(filter::test);

return new H2Result(this.rowMetadata, this.rows, this.rowsUpdated, filteredSegments);
}
Expand Down Expand Up @@ -115,7 +115,7 @@ public String toString() {
static H2Result toResult(Codecs codecs, @Nullable Integer rowsUpdated) {
Assert.requireNonNull(codecs, "codecs must not be null");

return new H2Result(Mono.justOrEmpty(rowsUpdated), Flux.empty());
return new H2Result(Mono.justOrEmpty(rowsUpdated), Flux.just((UpdateCount) () -> rowsUpdated));
}

static H2Result toResult(Codecs codecs, ResultInterface result, @Nullable Integer rowsUpdated) {
Expand Down Expand Up @@ -148,6 +148,6 @@ public Value[] next() {
.map(values -> H2Row.toRow(values, result, codecs, rowMetadata))
.onErrorMap(DbException.class, H2DatabaseExceptionFactory::convert);

return new H2Result(rowMetadata, rows, Mono.justOrEmpty(rowsUpdated), Flux.empty());
return new H2Result(rowMetadata, rows, Mono.justOrEmpty(rowsUpdated), rows);
}
}
12 changes: 9 additions & 3 deletions src/main/java/io/r2dbc/h2/H2Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.r2dbc.h2.codecs.Codecs;
import io.r2dbc.h2.util.Assert;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import org.h2.result.ResultInterface;
import org.h2.value.TypeInfo;
Expand All @@ -29,7 +30,7 @@
/**
* An implementation of {@link Row} for an H2 database.
*/
public final class H2Row implements Row {
public final class H2Row implements Row, Result.RowSegment {

private final Codecs codecs;

Expand Down Expand Up @@ -87,6 +88,11 @@ public int hashCode() {
return Objects.hash(this.columns);
}

@Override
public H2Row row() {
return this;
}

@Override
public String toString() {
return "H2Row{" +
Expand Down Expand Up @@ -120,15 +126,15 @@ private Column getColumn(String name) {
String normalized = name.toUpperCase();

if (!this.nameKeyedColumns.containsKey(normalized)) {
throw new IllegalArgumentException(String.format("Column name '%s' does not exist in column names %s", normalized, this.nameKeyedColumns.keySet()));
throw new NoSuchElementException(String.format("Column name '%s' does not exist in column names %s", normalized, this.nameKeyedColumns.keySet()));
}

return this.nameKeyedColumns.get(normalized);
}

private Column getColumn(int index) {
if (index >= this.columns.size()) {
throw new IllegalArgumentException(String.format("Column index %d is larger than the number of columns %d", index, this.columns.size()));
throw new IndexOutOfBoundsException(String.format("Column index %d is larger than the number of columns %d", index, this.columns.size()));
}

return this.columns.get(index);
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/r2dbc/h2/H2Statement.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ public final class H2Statement implements Statement {
@Override
public H2Statement add() {
this.bindings.finish();
this.bindings.open = true;
return this;
}

@Override
public H2Statement bind(String name, Object value) {
Assert.requireNonNull(name, "name must not be null");

this.bindings.open = false;
return addIndex(getIndex(name), value);
}

Expand All @@ -98,6 +99,8 @@ public H2Statement bindNull(int index, @Nullable Class<?> type) {

@Override
public Flux<H2Result> execute() {
Assert.requireTrue(!this.bindings.open, "No unfinished bindings!");

return Flux.fromArray(this.sql.split(";"))
.flatMap(sql -> {
if (this.generatedColumns == null) {
Expand Down Expand Up @@ -146,9 +149,11 @@ private static Flux<H2Result> execute(Client client, String sql, Bindings bindin
ResultWithGeneratedKeys result = client.update(command, generatedColumns);
CommandUtil.clearForReuse(command);
if (GeneratedKeysMode.valueOf(generatedColumns) == GeneratedKeysMode.NONE) {
return H2Result.toResult(codecs, result.getUpdateCount());
int updatedCountInt = Long.valueOf(result.getUpdateCount()).intValue();
return H2Result.toResult(codecs, updatedCountInt);
} else {
return H2Result.toResult(codecs, result.getGeneratedKeys(), result.getUpdateCount());
int updatedCountInt = Long.valueOf(result.getUpdateCount()).intValue();
return H2Result.toResult(codecs, result.getGeneratedKeys(), updatedCountInt);
}
}
} catch (DbException e) {
Expand All @@ -173,6 +178,12 @@ private static final class Bindings {

private Binding current;

private boolean open = false;

public boolean isOpen() {
return open;
}

@Override
public String toString() {
return "Bindings{" +
Expand All @@ -183,6 +194,7 @@ public String toString() {

private void finish() {
this.current = null;
this.open = false;
}

private Binding getCurrent() {
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/r2dbc/h2/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.r2dbc.h2.util.Assert;
import org.h2.command.Command;
import org.h2.command.CommandInterface;
import org.h2.engine.SessionInterface;
import org.h2.engine.Session;
import org.h2.result.ResultInterface;
import org.h2.result.ResultWithGeneratedKeys;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -113,7 +113,7 @@ default void execute(String sql) {
ResultWithGeneratedKeys update(CommandInterface command, Object generatedColumns);

/**
* Return back the current {@link SessionInterface} to the database.
* Return back the current {@link Session} to the database.
*/
SessionInterface getSession();
Session getSession();
}
23 changes: 13 additions & 10 deletions src/main/java/io/r2dbc/h2/client/SessionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.r2dbc.h2.util.Assert;
import org.h2.command.CommandInterface;
import org.h2.engine.ConnectionInfo;
import org.h2.engine.SessionInterface;
import org.h2.engine.Session;
import org.h2.engine.SessionRemote;
import org.h2.expression.ParameterInterface;
import org.h2.message.DbException;
Expand All @@ -31,22 +31,18 @@
import reactor.util.Logger;
import reactor.util.Loggers;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* An implementation of {@link Client} that wraps an H2 {@link SessionInterface}.
* An implementation of {@link Client} that wraps an H2 {@link Session}.
*/
public final class SessionClient implements Client {

private final Logger logger = Loggers.getLogger(this.getClass());

private final Collection<Binding> emptyBinding = Collections.singleton(Binding.EMPTY);

private final SessionInterface session;
private final Session session;

private final boolean shutdownDatabaseOnClose;

Expand Down Expand Up @@ -100,6 +96,13 @@ public Iterator<CommandInterface> prepareCommand(String sql, List<Binding> bindi
Assert.requireNonNull(sql, "sql must not be null");
Assert.requireNonNull(bindings, "bindings must not be null");

if (!bindings.isEmpty()) {
Binding binding = bindings.get(bindings.size()-1);
if (binding.getParameters().isEmpty()) {
throw new IllegalStateException("You got an unbound binder!");
}
}

Iterator<Binding> bindingIterator = bindings.isEmpty() ? emptyBinding.iterator() : bindings.iterator();

return new Iterator<CommandInterface>() {
Expand Down Expand Up @@ -142,10 +145,10 @@ public ResultWithGeneratedKeys update(CommandInterface command, Object generated
}

/**
* Return back the current {@link SessionInterface} to the database.
* Return back the current {@link Session} to the database.
*/
@Override
public SessionInterface getSession() {
public Session getSession() {
return this.session;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/r2dbc/h2/codecs/ArrayCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ Object[] doDecode(Value value, Class<? extends Object[]> type) {
Value doEncode(Object[] value) {
return ValueArray.get(Arrays.stream(Assert.requireNonNull(value, "value must not be null"))
.map(codecs::encode)
.toArray(Value[]::new));
.toArray(Value[]::new), null);
}
}
6 changes: 3 additions & 3 deletions src/main/java/io/r2dbc/h2/codecs/BigDecimalCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io.r2dbc.h2.util.Assert;
import org.h2.value.Value;
import org.h2.value.ValueDecimal;
import org.h2.value.ValueNumeric;

import java.math.BigDecimal;

Expand All @@ -30,7 +30,7 @@ final class BigDecimalCodec extends AbstractCodec<BigDecimal> {

@Override
boolean doCanDecode(int dataType) {
return dataType == Value.DECIMAL;
return dataType == Value.NUMERIC;
}

@Override
Expand All @@ -40,6 +40,6 @@ BigDecimal doDecode(Value value, Class<? extends BigDecimal> type) {

@Override
Value doEncode(BigDecimal value) {
return ValueDecimal.get(Assert.requireNonNull(value, "value must not be null"));
return ValueNumeric.get(Assert.requireNonNull(value, "value must not be null"));
}
}
15 changes: 8 additions & 7 deletions src/main/java/io/r2dbc/h2/codecs/BlobCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

package io.r2dbc.h2.codecs;

import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.Iterator;

import io.r2dbc.h2.client.Client;
import io.r2dbc.h2.util.Assert;
import io.r2dbc.spi.Blob;
import org.h2.value.Value;
import org.h2.value.ValueBlob;
import org.h2.value.ValueNull;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.Iterator;

final class BlobCodec extends AbstractCodec<Blob> {

private final Client client;
Expand Down Expand Up @@ -56,12 +57,12 @@ Blob doDecode(Value value, Class<? extends Blob> type) {
Value doEncode(Blob value) {
Assert.requireNonNull(value, "value must not be null");

Value blob = this.client.getSession().getDataHandler().getLobStorage().createBlob(
ValueBlob blob = this.client.getSession().getDataHandler().getLobStorage().createBlob(
new SequenceInputStream(
new BlobInputStreamEnumeration(value)), -1);

this.client.getSession().addTemporaryLob(blob);

return blob;
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/r2dbc/h2/codecs/BlobToByteBufferCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.r2dbc.h2.util.Assert;
import io.r2dbc.spi.Blob;
import org.h2.value.Value;
import org.h2.value.ValueBlob;
import org.h2.value.ValueNull;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -62,7 +63,7 @@ ByteBuffer doDecode(Value value, Class<? extends ByteBuffer> type) {
Value doEncode(ByteBuffer value) {
Assert.requireNonNull(value, "value must not be null");

Value blob = this.client.getSession().getDataHandler().getLobStorage().createBlob(
ValueBlob blob = this.client.getSession().getDataHandler().getLobStorage().createBlob(
new SequenceInputStream(
new BlobInputStreamEnumeration(value)), -1);

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/r2dbc/h2/codecs/ByteCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io.r2dbc.h2.util.Assert;
import org.h2.value.Value;
import org.h2.value.ValueByte;
import org.h2.value.ValueBinary;

final class ByteCodec extends AbstractCodec<Byte> {

Expand All @@ -28,7 +28,7 @@ final class ByteCodec extends AbstractCodec<Byte> {

@Override
boolean doCanDecode(int dataType) {
return dataType == Value.BYTE;
return dataType == Value.TINYINT;
}

@Override
Expand All @@ -38,6 +38,6 @@ Byte doDecode(Value value, Class<? extends Byte> type) {

@Override
Value doEncode(Byte value) {
return ValueByte.get(Assert.requireNonNull(value, "value must not be null"));
return ValueBinary.get(new byte[]{Assert.requireNonNull(value, "value must not be null")});
}
}
Loading

0 comments on commit dd279a6

Please sign in to comment.