Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade SPI and fix blocking errors #489

Merged
merged 6 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,24 @@
<artifactId>cloud-spanner-r2dbc</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Until Spring Boot picks up Spring Data R2DBC 1.4.1, drivers
compatible with SPI 0.9 have a runtime method mismatch with
getColumnMetadatas() signature.
The next two dependency overrides fix that.
-->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
</dependencies>

</dependencyManagement>
<dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static class Builder {
@Deprecated
public Builder setUrl(String url) {
String databaseString =
ConnectionFactoryOptions.parse(url).getValue(ConnectionFactoryOptions.DATABASE);
(String) ConnectionFactoryOptions.parse(url).getValue(ConnectionFactoryOptions.DATABASE);

validateFullyQualifiedDatabaseName(databaseString);
this.fullyQualifiedDatabaseName = databaseString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public ConnectionFactory create(ConnectionFactoryOptions connectionFactoryOption
@Override
public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) {
Assert.requireNonNull(connectionFactoryOptions, "connectionFactoryOptions must not be null");
String driver = connectionFactoryOptions.getValue(DRIVER);
String driver = (String) connectionFactoryOptions.getValue(DRIVER);

return DRIVER_NAME.equals(driver) || SHORT_DRIVER_NAME.equals(driver);
}
Expand All @@ -122,16 +122,16 @@ SpannerConnectionConfiguration createConfiguration(
// Directly passed URL is supported for backwards compatibility. R2DBC SPI does not provide
// the original URL when creating connection through ConnectionFactories.get(String).
if (options.hasOption(URL)) {
config.setUrl(options.getValue(URL));
config.setUrl((String) options.getValue(URL));
} else if (options.hasOption(DATABASE)
&& FQDN_PATTERN_PARSE.matcher(options.getValue(DATABASE)).matches()) {
&& FQDN_PATTERN_PARSE.matcher((String) options.getValue(DATABASE)).matches()) {
// URL-based connection configuration
config.setFullyQualifiedDatabaseName(options.getValue(DATABASE));
config.setFullyQualifiedDatabaseName((String) options.getValue(DATABASE));
} else {
// Programmatic connection configuration.
config.setProjectId(options.getRequiredValue(PROJECT))
.setInstanceName(options.getRequiredValue(INSTANCE))
.setDatabaseName(options.getRequiredValue(DATABASE));
config.setProjectId((String) options.getRequiredValue(PROJECT))
.setInstanceName((String) options.getRequiredValue(INSTANCE))
.setDatabaseName((String) options.getRequiredValue(DATABASE));
}

config.setCredentials(extractCredentials(options));
Expand All @@ -142,7 +142,7 @@ SpannerConnectionConfiguration createConfiguration(
}

if (options.hasOption(OPTIMIZER_VERSION)) {
config.setOptimizerVersion(options.getValue(OPTIMIZER_VERSION));
config.setOptimizerVersion((String) options.getValue(OPTIMIZER_VERSION));
}

if (options.hasOption(AUTOCOMMIT)) {
Expand Down Expand Up @@ -189,11 +189,11 @@ private OAuth2Credentials extractCredentials(ConnectionFactoryOptions options) {
}

if (options.hasOption(OAUTH_TOKEN)) {
return this.credentialsHelper.getOauthCredentials(options.getValue(OAUTH_TOKEN));
return this.credentialsHelper.getOauthCredentials((String) options.getValue(OAUTH_TOKEN));
} else if (options.hasOption(CREDENTIALS)) {
return this.credentialsHelper.getFileCredentials(options.getValue(CREDENTIALS));
return this.credentialsHelper.getFileCredentials((String) options.getValue(CREDENTIALS));
} else if (options.hasOption(GOOGLE_CREDENTIALS)) {
return options.getValue(GOOGLE_CREDENTIALS);
return (OAuth2Credentials) options.getValue(GOOGLE_CREDENTIALS);
} else if (options.hasOption(USE_PLAIN_TEXT)) {
return NoCredentials.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,10 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(this.structField);
}

@Override
public io.r2dbc.spi.Type getType() {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.r2dbc.spi.ConnectionMetadata;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import java.time.Duration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

Expand All @@ -47,6 +49,21 @@ public Publisher<Void> beginTransaction() {
return this.clientLibraryAdapter.beginTransaction();
}

@Override
public Publisher<Void> beginTransaction(TransactionDefinition definition) {
return Mono.error(new UnsupportedOperationException());
}

@Override
public Publisher<Void> setLockWaitTimeout(Duration timeout) {
return Mono.error(new UnsupportedOperationException());
}

@Override
public Publisher<Void> setStatementTimeout(Duration timeout) {
return Mono.error(new UnsupportedOperationException());
}

@Override
public Mono<Void> beginReadonlyTransaction(TimestampBound timestampBound) {
return this.clientLibraryAdapter.beginReadonlyTransaction(timestampBound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -54,4 +56,15 @@ public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFun
return mappingFunction.apply(row, this.rowMetadata);
});
}

@Override
public Result filter(Predicate<Segment> filter) {
throw new UnsupportedOperationException();
}

@Override
public <T> Publisher<T> flatMap(
Function<Segment, ? extends Publisher<? extends T>> mappingFunction) {
return Mono.error(new UnsupportedOperationException());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public RowMetadata generateMetadata() {
Assert.requireNonNull(this.rowFields.getType(), "rowFields type must not be null");
return new SpannerClientLibraryRowMetadata(this.rowFields.getType().getStructFields());
}

@Override
public RowMetadata getMetadata() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public ColumnMetadata getColumnMetadata(String identifier) {
}

@Override
public Iterable<? extends ColumnMetadata> getColumnMetadatas() {
public List<? extends ColumnMetadata> getColumnMetadatas() {
elefeint marked this conversation as resolved.
Show resolved Hide resolved
return this.columnMetadatas;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.cloud.spanner.r2dbc.v2.JsonWrapper;
import com.google.cloud.spanner.r2dbc.v2.SpannerClientLibraryConnectionFactory;
import io.r2dbc.spi.Closeable;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
Expand All @@ -42,8 +41,6 @@
import io.r2dbc.spi.ValidationDepth;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -147,12 +144,8 @@ void testMetadata() {

StepVerifier.create(
Mono.from(conn.createStatement("SELECT AUTHOR, PRICE FROM BOOKS LIMIT 1").execute())
.flatMapMany(rs -> rs.map((row, rmeta) -> {
List<ColumnMetadata> list = new ArrayList<>();
rmeta.getColumnMetadatas().forEach(list::add);
return list;
})))
.assertNext(metadataList -> {
.flatMapMany(rs -> rs.map((row, rmeta) -> rmeta.getColumnMetadatas()))
).assertNext(metadataList -> {
assertEquals(2, metadataList.size());
assertEquals("AUTHOR", metadataList.get(0).getName());
assertEquals("PRICE", metadataList.get(1).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ interface DatabaseProperties {
String URL = String.format(
"r2dbc:cloudspanner://spanner.googleapis.com:443/projects/%s/instances/%s/databases/%s",
ServiceOptions.getDefaultProjectId(), INSTANCE, DATABASE);

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
/**
* R2DBC TCK test implementation.
*/
@Disabled ("Until missing SPI v0.9 functionality is implemented")
public class SpannerClientLibraryTestKit implements TestKit<String> {

private static final String DISABLE_UNSUPPORTED_FUNCTIONALITY =
Expand Down Expand Up @@ -95,11 +96,11 @@ static void setUp() {

DatabaseId id = DatabaseId.of(
options.getProjectId(), DatabaseProperties.INSTANCE, DatabaseProperties.DATABASE);
createTableIfNeeded(id, "test", " ( value INT64 ) PRIMARY KEY (value)");
createTableIfNeeded(id, "test", " ( test_value INT64 ) PRIMARY KEY (test_value)");
createTableIfNeeded(
id, "test_two_column", " ( col1 INT64, col2 STRING(MAX) ) PRIMARY KEY (col1)");
createTableIfNeeded(id, "blob_test", " ( value BYTES(MAX) ) PRIMARY KEY (value)");
createTableIfNeeded(id, "clob_test", " ( value BYTES(MAX) ) PRIMARY KEY (value)");
createTableIfNeeded(id, "blob_test", " ( test_value BYTES(MAX) ) PRIMARY KEY (test_value)");
createTableIfNeeded(id, "clob_test", " ( test_value BYTES(MAX) ) PRIMARY KEY (test_value)");
}

private static void createTableIfNeeded(DatabaseId id, String tableName, String definition) {
Expand Down Expand Up @@ -221,13 +222,14 @@ public void duplicateColumnNames() {
.execute())

.flatMap(result -> result
.map((row, rowMetadata) -> Arrays.asList(row.get("value"), row.get("VALUE"))))
.map((row, rowMetadata) ->
Arrays.asList(row.get("test_value"), row.get("TEST_VALUE"))))
.flatMapIterable(Function.identity())

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(100L).as("value from col1")
.expectNext("hello").as("value from col2")
.expectNext(100L).as("test_value from col1")
.expectNext("hello").as("test_value from col2")
.verifyComplete();
}

Expand Down Expand Up @@ -366,7 +368,7 @@ public void transactionCommit() {
functionality. */
Mono<List<Long>> extractColumnsLong(Result result) {
return Flux.from(result
.map((row, rowMetadata) -> row.get("value", Long.class)))
.map((row, rowMetadata) -> row.get("test_value", Long.class)))
.collectList();
}

Expand All @@ -392,7 +394,7 @@ public void changeAutoCommitCommitsTransaction() {
.flatMap(Result::getRowsUpdated)
.thenMany(connection.setAutoCommit(true))
.thenMany(connection.createStatement(expand(TestStatement.SELECT_VALUE)).execute())
.flatMap(it -> it.map((row, metadata) -> row.get("value")))
.flatMap(it -> it.map((row, metadata) -> row.get("test_value")))
.concatWith(close(connection))
)
.as(StepVerifier::create)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ class SpannerTestKitStatements {
static final Map<TestStatement, String> STATEMENTS = new HashMap<>();

static {
STATEMENTS.put(INSERT_VALUE100, "INSERT INTO test (value) VALUES (100)");
STATEMENTS.put(INSERT_VALUE200, "INSERT INTO test (value) VALUES (200)");
STATEMENTS.put(INSERT_VALUE100, "INSERT INTO test (test_value) VALUES (100)");
STATEMENTS.put(INSERT_VALUE200, "INSERT INTO test (test_value) VALUES (200)");
STATEMENTS.put(INSERT_TWO_COLUMNS,
"INSERT INTO test_two_column (col1, col2) VALUES (100, 'hello')");
STATEMENTS.put(INSERT_BLOB_VALUE_PLACEHOLDER, "INSERT INTO blob_test VALUES (?)");
STATEMENTS.put(INSERT_CLOB_VALUE_PLACEHOLDER, "INSERT INTO clob_test VALUES (?)");
STATEMENTS.put(INSERT_VALUE_PLACEHOLDER, "INSERT INTO test (value) VALUES (%s)");
STATEMENTS.put(INSERT_VALUE_PLACEHOLDER, "INSERT INTO test (test_value) VALUES (%s)");

// Spanner column names are case-sensitive
STATEMENTS.put(SELECT_VALUE_TWO_COLUMNS,
"SELECT col1 AS value, col2 AS VALUE FROM test_two_column");
"SELECT col1 AS test_value, col2 AS TEST_VALUE FROM test_two_column");
}

static String expand(TestStatement statement, Object... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.r2dbc.v2;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import com.google.cloud.spanner.Type;
Expand All @@ -25,6 +26,7 @@
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -65,4 +67,12 @@ void simpleTypeMetadataAsExpected(Type spannerType, Class javaType) {
assertThat(meta.getNativeTypeMetadata()).isEqualTo(spannerType);
assertThat(meta.getJavaType()).isEqualTo(javaType);
}

@Test
void getTypeNotSupported() {
StructField field = StructField.of("col1", Type.string());
SpannerClientLibraryColumnMetadata metadata = new SpannerClientLibraryColumnMetadata(field);
assertThatThrownBy(() -> metadata.getType())
.isInstanceOf(UnsupportedOperationException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.IsolationLevel;
import java.time.Duration;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,4 +80,25 @@ void batchUsesCorrectAdapter() {
assertThat(args).hasSize(1);
assertThat(args.get(0).getSql()).isEqualTo("UPDATE tbl SET col1=val1");
}

@Test
void beginTransactionCustomDefinitionNotSupported() {
StepVerifier.create(
this.connection.beginTransaction(IsolationLevel.SERIALIZABLE)
).verifyError(UnsupportedOperationException.class);
}

@Test
void setLockWaitTimeoutNotSupported() {
StepVerifier.create(
this.connection.setLockWaitTimeout(Duration.ofSeconds(1))
).verifyError(UnsupportedOperationException.class);
}

@Test
void setStatementTimeoutNotSupported() {
StepVerifier.create(
this.connection.setStatementTimeout(Duration.ofSeconds(1))
).verifyError(UnsupportedOperationException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.r2dbc.v2;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -25,6 +26,7 @@
import io.r2dbc.spi.RowMetadata;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

class SpannerClientLibraryResultTest {
Expand Down Expand Up @@ -65,6 +67,19 @@ void mapGeneratesMetadataOnlyOnFirstCall() {
verify(mockRow2, times(0)).generateMetadata();
}

@Test
void filterNotSupported() {
SpannerClientLibraryResult result = new SpannerClientLibraryResult(Flux.empty(), 0);
assertThatThrownBy(() -> result.filter(null))
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
void flatMapWithSegmentNotSupported() {
SpannerClientLibraryResult result = new SpannerClientLibraryResult(Flux.empty(), 0);
StepVerifier.create(
result.flatMap(segment -> Mono.empty())
).verifyError(UnsupportedOperationException.class);
}

}
Loading