Skip to content

Commit

Permalink
feat(s3stream): atomic failover feature (#899)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Jan 18, 2024
1 parent 315ec49 commit cf07ae3
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 267 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.16.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.17.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.16.0-SNAPSHOT</version>
<version>0.17.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
9 changes: 9 additions & 0 deletions s3stream/src/main/java/com/automq/stream/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package com.automq.stream.api;

import com.automq.stream.s3.failover.FailoverRequest;
import com.automq.stream.s3.failover.FailoverResponse;
import java.util.concurrent.CompletableFuture;

/**
* Elastic Stream client.
*/
Expand All @@ -38,4 +42,9 @@ public interface Client {
* @return {@link KVClient}
*/
KVClient kvClient();

/**
* Failover the another node volume
*/
CompletableFuture<FailoverResponse> failover(FailoverRequest request);
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ public class Failover {
private final ExecutorService executor = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("wal-failover-%d", true), LOGGER);
private final FailoverFactory factory;
private final WALRecover walRecover;
private final Serverless serverless;

public Failover(FailoverFactory factory, WALRecover walRecover, Serverless serverless) {
public Failover(FailoverFactory factory, WALRecover walRecover) {
this.factory = factory;
this.walRecover = walRecover;
this.serverless = serverless;
}

public CompletableFuture<FailoverResponse> failover(FailoverRequest request) {
Expand Down Expand Up @@ -81,14 +79,12 @@ public FailoverResponse failover() throws Throwable {
FailoverResponse resp = new FailoverResponse();
resp.setNodeId(request.getNodeId());
// fence the device to ensure the old node stops writing to the delta WAL
serverless.fence(request.getVolumeId());
// recover WAL data and upload to S3
BlockWALService wal = BlockWALService.recoveryBuilder(request.getDevice()).build();
try {
wal.start();
} catch (WALNotInitializedException ex) {
LOGGER.info("fail over empty wal {}", request);
serverless.delete(request.getVolumeId());
return resp;
}
try {
Expand All @@ -108,8 +104,6 @@ public FailoverResponse failover() throws Throwable {
} finally {
wal.shutdownGracefully();
}
// delete the volume
serverless.delete(request.getVolumeId());
LOGGER.info("failover done {}", request);
return resp;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,23 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class FailoverTest {
String path;
FailoverFactory failoverFactory;
WALRecover walRecover;
Serverless serverless;
Failover failover;

@BeforeEach
public void setup() {
path = "/tmp/" + System.currentTimeMillis() + "/failover_test_wal";
failoverFactory = mock(FailoverFactory.class);
walRecover = mock(WALRecover.class);
serverless = mock(Serverless.class);
failover = spy(new Failover(failoverFactory, walRecover, serverless));
failover = spy(new Failover(failoverFactory, walRecover));
}

@AfterEach
Expand Down Expand Up @@ -82,12 +77,8 @@ public void test() throws IOException, ExecutionException, InterruptedException,

// node match
request.setNodeId(233);
failover.failover(request).get(1, TimeUnit.SECONDS);

ArgumentCaptor<String> ac = ArgumentCaptor.forClass(String.class);
verify(serverless, times(1)).delete(ac.capture());
String req = ac.getValue();
assertEquals("test_volume_id", req);
FailoverResponse resp = failover.failover(request).get(1, TimeUnit.SECONDS);
assertEquals(233, resp.getNodeId());
}

}

0 comments on commit cf07ae3

Please sign in to comment.