Skip to content

Commit

Permalink
Fail gracefully if lock cannot be obtained (#90)
Browse files Browse the repository at this point in the history
* Fix javadoc typo on response values

* Attempt to read locks from cassandra table before inserting

If the insert locks fails, there's still a chance the table will contain
locks. This makes retrying the run difficult since the tables will have
an abandoned lock that requires manually cleaning up.

Attempt to read the locks at the desired consistency first. If it
succeeds, ignore the result and continue trying to insert locks with
existing success/failure logic. If the read locks fails, exit in a way
that has not altered tables and can be gracefully retried.

This will not handle all error cases, but helps in situations where (for
example) consistency cannot be achieved temporarily - when nodes are returned
to service the cqlmigrate can be reattempted automatically without
requiring manual lock cleanup.

See #87

* Reduce org.apache.http logging to info

Test output seems too long to be useful, and logging headers for each
request seems overkill in general test execution

* Refactor the is-cluster-healthy check to separate method for clarity
  • Loading branch information
davidh87 authored and adamdougal committed Jan 27, 2020
1 parent 88cce09 commit bfe7de7
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 16 deletions.
22 changes: 16 additions & 6 deletions src/main/java/uk/sky/cqlmigrate/CassandraLockingMechanism.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package uk.sky.cqlmigrate;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.WriteTimeoutException;
import org.slf4j.Logger;
Expand All @@ -20,6 +16,7 @@ class CassandraLockingMechanism extends LockingMechanism {
private final ConsistencyLevel consistencyLevel;
private final String lockKeyspace;

private PreparedStatement selectLockQuery;
private PreparedStatement insertLockQuery;
private PreparedStatement deleteLockQuery;
private boolean isRetryAfterWriteTimeout;
Expand All @@ -41,6 +38,8 @@ public void init() throws CannotAcquireLockException {
super.init();

try {
selectLockQuery = session.prepare(String.format("SELECT name,client FROM %s.locks LIMIT 1", lockKeyspace))
.setConsistencyLevel(consistencyLevel);
insertLockQuery = session.prepare(String.format("INSERT INTO %s.locks (name, client) VALUES (?, ?) IF NOT EXISTS", lockKeyspace))
.setConsistencyLevel(consistencyLevel);
deleteLockQuery = session.prepare(String.format("DELETE FROM %s.locks WHERE name = ? IF client = ?", lockKeyspace))
Expand All @@ -55,14 +54,15 @@ public void init() throws CannotAcquireLockException {
* {@inheritDoc}
* <p>
* Returns true if successfully inserted lock.
* Returns false if current lock is owned by this client.
* Returns true if current lock is owned by this client.
* Returns false if WriteTimeoutException thrown.
*
* @throws CannotAcquireLockException if any DriverException thrown while executing queries.
*/
@Override
public boolean acquire(String clientId) throws CannotAcquireLockException {
try {
verifyClusterIsHealthy();
ResultSet resultSet = session.execute(insertLockQuery.bind(lockName, clientId));
Row currentLock = resultSet.one();
// we could already hold the lock and not be aware if a previous acquire had a writetimeout as a timeout is not a failure in cassandra
Expand All @@ -80,6 +80,16 @@ public boolean acquire(String clientId) throws CannotAcquireLockException {
}
}

/**
* Verify that a select of the locks completes successfully
*
* @throws DriverException propagated from the session.execute() call. See
* {@link ResultSetFuture#getUninterruptibly()} for more explcit details
*/
private void verifyClusterIsHealthy() {
session.execute(selectLockQuery.bind());
}

/**
* {@inheritDoc}
* <p>
Expand Down
126 changes: 116 additions & 10 deletions src/test/java/uk/sky/cqlmigrate/CassandraLockingMechanismTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import uk.sky.cqlmigrate.exception.CannotReleaseLockException;
import uk.sky.cqlmigrate.util.PortScavenger;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.assertj.core.api.Assertions.*;
import static org.scassandra.http.client.PrimingRequest.then;
import static org.scassandra.http.client.types.ColumnMetadata.column;

Expand All @@ -37,6 +37,11 @@ public class CassandraLockingMechanismTest {
private final PrimingClient primingClient = SCASSANDRA.primingClient();
private final ActivityClient activityClient = SCASSANDRA.activityClient();

private final PreparedStatementExecution selectLockPreparedStatement = PreparedStatementExecution.builder()
.withPreparedStatementText("SELECT name,client FROM cqlmigrate.locks LIMIT 1")
.withConsistency("ALL")
.build();

private final PreparedStatementExecution deleteLockPreparedStatement = PreparedStatementExecution.builder()
.withPreparedStatementText("DELETE FROM cqlmigrate.locks WHERE name = ? IF client = ?")
.withConsistency("ALL")
Expand All @@ -60,6 +65,15 @@ public void baseSetup() throws Exception {
.withPort(BINARY_PORT)
.build();

primingClient.prime(PrimingRequest.preparedStatementBuilder()
.withQuery("SELECT name,client FROM cqlmigrate.locks LIMIT 1")
.withThen(then()
.withColumnTypes(column("name", PrimitiveType.ASCII))
.withColumnTypes(column("client", PrimitiveType.ASCII))
.withRows(Collections.emptyList())
)
.build());

primingClient.prime(PrimingRequest.preparedStatementBuilder()
.withQuery("INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS")
.withThen(then()
Expand Down Expand Up @@ -121,13 +135,43 @@ public void shouldInsertLockWhenAcquiringLock() throws Exception {
lockingMechanism.acquire(CLIENT_ID);

//then
assertThat(activityClient.retrievePreparedStatementExecutions())
.hasOnlyOneElementSatisfying(preparedStatementExecution -> {
assertThat(preparedStatementExecution)
.usingRecursiveComparison()
.ignoringFields("variableTypes", "timestamp")
.isEqualTo(insertLockPreparedStatement);
});
List<PreparedStatementExecution> executions = activityClient.retrievePreparedStatementExecutions();
assertThat(executions.size()).isEqualTo(2);

assertThat(executions.get(0))
.usingRecursiveComparison()
.ignoringFields("timestamp")
.isEqualTo(selectLockPreparedStatement);

assertThat(executions.get(1))
.usingRecursiveComparison()
.ignoringFields("variableTypes", "timestamp")
.isEqualTo(insertLockPreparedStatement);
}

@Test
public void shouldNotAttemptToInsertLockIfReadingLocksFails() throws Exception {
//given
primingClient.prime(PrimingRequest.preparedStatementBuilder()
.withQuery("SELECT name,client FROM cqlmigrate.locks LIMIT 1")
.withThen(then()
.withResult(Result.unavailable))
.build()
);

//when
Throwable throwable = catchThrowable(() -> lockingMechanism.acquire(CLIENT_ID));

//then
assertThat(throwable).isNotNull();

List<PreparedStatementExecution> executions = activityClient.retrievePreparedStatementExecutions();
assertThat(executions.size()).isEqualTo(1);

assertThat(executions.get(0))
.usingRecursiveComparison()
.ignoringFields("timestamp")
.isEqualTo(selectLockPreparedStatement);
}

@Test
Expand All @@ -141,6 +185,28 @@ public void shouldSuccessfullyAcquireLockWhenInsertIsApplied() throws Exception
.isTrue();
}

@Test
public void shouldSuccessfullyAcquireLockIgnoringResultOfSelectLocks() throws Exception {
//given
String lockName = String.format("%s.schema_migration", LOCK_KEYSPACE);
primingClient.prime(PrimingRequest.preparedStatementBuilder()
.withQuery("SELECT name,client FROM cqlmigrate.locks LIMIT 1")
.withThen(then()
.withColumnTypes(column("name", PrimitiveType.ASCII))
.withColumnTypes(column("client", PrimitiveType.ASCII))
.withRows(ImmutableMap.of("name", lockName, "client", CLIENT_ID))
)
.build());

//when
boolean acquiredLock = lockingMechanism.acquire(CLIENT_ID);

//then
assertThat(acquiredLock)
.describedAs("lock was acquired")
.isTrue();
}

@Test
public void shouldUnsuccessfullyAcquireLockWhenInsertIsNotApplied() throws Exception {
//given
Expand All @@ -162,6 +228,46 @@ public void shouldUnsuccessfullyAcquireLockWhenInsertIsNotApplied() throws Excep
.isFalse();
}

@Test
public void shouldThrowExceptionWhenCannotInitiallyReadLocks() throws Exception {
//given
primingClient.prime(PrimingRequest.preparedStatementBuilder()
.withQuery("SELECT name,client FROM cqlmigrate.locks LIMIT 1")
.withThen(then()
.withResult(Result.unavailable))
.build()
);

//when
Throwable throwable = catchThrowable(() -> lockingMechanism.acquire(CLIENT_ID));

//then
assertThat(throwable)
.isNotNull()
.isInstanceOf(CannotAcquireLockException.class)
.hasCauseInstanceOf(DriverException.class)
.hasMessage(String.format("Query to acquire lock %s.schema_migration for client %s failed to execute", LOCK_KEYSPACE, CLIENT_ID));
}

@Test
public void shouldUnsuccessfullyAcquireLockWhenWriteTimeoutOccursWhenInitiallyReadingLocks() throws Exception {
//given
primingClient.prime(PrimingRequest.preparedStatementBuilder()
.withQuery("SELECT name,client FROM cqlmigrate.locks LIMIT 1")
.withThen(then()
.withResult(Result.write_request_timeout))
.build()
);

//when
boolean acquiredLock = lockingMechanism.acquire(CLIENT_ID);

//then
assertThat(acquiredLock)
.describedAs("lock was not acquired")
.isFalse();
}

@Test
public void shouldSuccessfullyAcquireLockWhenLockIsAlreadyAcquired() throws Exception {
//given
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
</encoder>
</appender>

<logger name="org.apache.http" level="INFO" />

<root level="debug">
<appender-ref ref="STDOUT" />
</root>
Expand Down

0 comments on commit bfe7de7

Please sign in to comment.