diff --git a/pom.xml b/pom.xml
index 02ca7285b..735d0f1c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.1.3-jre
2.0.9
2.2
- 0.16.0-SNAPSHOT
+ 0.17.0-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index 81ddd41b4..d43abdc41 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.16.0-SNAPSHOT
+ 0.17.0-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/api/Client.java b/s3stream/src/main/java/com/automq/stream/api/Client.java
index 9ca1f48a1..5c0e5fabc 100644
--- a/s3stream/src/main/java/com/automq/stream/api/Client.java
+++ b/s3stream/src/main/java/com/automq/stream/api/Client.java
@@ -17,6 +17,10 @@
package com.automq.stream.api;
+import com.automq.stream.s3.failover.FailoverRequest;
+import com.automq.stream.s3.failover.FailoverResponse;
+import java.util.concurrent.CompletableFuture;
+
/**
* Elastic Stream client.
*/
@@ -38,4 +42,9 @@ public interface Client {
* @return {@link KVClient}
*/
KVClient kvClient();
+
+ /**
+ * Failover the another node volume
+ */
+ CompletableFuture failover(FailoverRequest request);
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/CompleteFailoverRequest.java b/s3stream/src/main/java/com/automq/stream/s3/failover/CompleteFailoverRequest.java
deleted file mode 100644
index 98544e07a..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/failover/CompleteFailoverRequest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.failover;
-
-public class CompleteFailoverRequest {
- private int nodeId;
- private String volumeId;
-
- public CompleteFailoverRequest(int nodeId, String volumeId) {
- this.nodeId = nodeId;
- this.volumeId = volumeId;
- }
-
- public int getNodeId() {
- return nodeId;
- }
-
- public void setNodeId(int nodeId) {
- this.nodeId = nodeId;
- }
-
- public String getVolumeId() {
- return volumeId;
- }
-
- public void setVolumeId(String volumeId) {
- this.volumeId = volumeId;
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/DefaultServerless.java b/s3stream/src/main/java/com/automq/stream/s3/failover/DefaultServerless.java
deleted file mode 100644
index 2328c93ac..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/failover/DefaultServerless.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.failover;
-
-import com.automq.stream.utils.CommandResult;
-import com.automq.stream.utils.CommandUtils;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-public class DefaultServerless implements Serverless {
- private static final String SERVERLESS_CMD = "/opt/automq/scripts/amq-serverless";
-
- private static void check(String[] cmd, CommandResult rst) throws ExecutionException {
- if (rst.code() != 0) {
- throw new ExecutionException("Run " + Arrays.toString(cmd) + ", code:" + rst.code() + " failed: " + rst.stderr(), null);
- }
- }
-
- private static T jsonParse(String raw, Class clazz) throws ExecutionException {
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.readValue(raw, clazz);
- } catch (JsonProcessingException e) {
- throw new ExecutionException("json parse (" + raw + ") fail", e);
- }
- }
-
- @Override
- public String attach(String volumeId, int nodeId) throws ExecutionException {
- String[] cmd = new String[] {SERVERLESS_CMD, "volume", "attach", "-v", volumeId, "-n", Integer.toString(nodeId)};
- CommandResult result = CommandUtils.run(cmd);
- check(cmd, result);
- return jsonParse(result.stdout(), AttachResult.class).getDeviceName();
- }
-
- @Override
- public void delete(String volumeId) throws ExecutionException {
- String[] cmd = new String[] {SERVERLESS_CMD, "volume", "delete", "-v", volumeId};
- CommandResult result = CommandUtils.run(cmd);
- check(cmd, result);
- }
-
- @Override
- public void fence(String volumeId) throws ExecutionException {
- String[] cmd = new String[] {SERVERLESS_CMD, "volume", "fence", "-v", volumeId};
- CommandResult result = CommandUtils.run(cmd);
- check(cmd, result);
- }
-
- @Override
- public List scan() throws ExecutionException {
- String[] cmd = new String[] {SERVERLESS_CMD, "volume", "queryFailover"};
- CommandResult result = CommandUtils.run(cmd);
- check(cmd, result);
- QueryFailedNode[] nodes = jsonParse(result.stdout(), QueryFailedNode[].class);
- return Arrays.stream(nodes).map(n -> {
- FailedNode failedNode = new FailedNode();
- failedNode.setNodeId(Integer.parseInt(n.getFirstBindNodeId()));
- failedNode.setVolumeId(n.getVolumeId());
- return failedNode;
- }).collect(Collectors.toList());
- }
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- static class AttachResult {
- private String deviceName;
-
- public String getDeviceName() {
- return deviceName;
- }
-
- public void setDeviceName(String deviceName) {
- this.deviceName = deviceName;
- }
- }
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- static class QueryFailedNode {
- private String firstBindNodeId;
- private String volumeId;
-
- public String getFirstBindNodeId() {
- return firstBindNodeId;
- }
-
- public void setFirstBindNodeId(String firstBindNodeId) {
- this.firstBindNodeId = firstBindNodeId;
- }
-
- public String getVolumeId() {
- return volumeId;
- }
-
- public void setVolumeId(String volumeId) {
- this.volumeId = volumeId;
- }
- }
-}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/Failover.java b/s3stream/src/main/java/com/automq/stream/s3/failover/Failover.java
index 9dde15cb9..56e1818f5 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/failover/Failover.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/failover/Failover.java
@@ -46,12 +46,10 @@ public class Failover {
private final ExecutorService executor = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("wal-failover-%d", true), LOGGER);
private final FailoverFactory factory;
private final WALRecover walRecover;
- private final Serverless serverless;
- public Failover(FailoverFactory factory, WALRecover walRecover, Serverless serverless) {
+ public Failover(FailoverFactory factory, WALRecover walRecover) {
this.factory = factory;
this.walRecover = walRecover;
- this.serverless = serverless;
}
public CompletableFuture failover(FailoverRequest request) {
@@ -81,14 +79,12 @@ public FailoverResponse failover() throws Throwable {
FailoverResponse resp = new FailoverResponse();
resp.setNodeId(request.getNodeId());
// fence the device to ensure the old node stops writing to the delta WAL
- serverless.fence(request.getVolumeId());
// recover WAL data and upload to S3
BlockWALService wal = BlockWALService.recoveryBuilder(request.getDevice()).build();
try {
wal.start();
} catch (WALNotInitializedException ex) {
LOGGER.info("fail over empty wal {}", request);
- serverless.delete(request.getVolumeId());
return resp;
}
try {
@@ -108,8 +104,6 @@ public FailoverResponse failover() throws Throwable {
} finally {
wal.shutdownGracefully();
}
- // delete the volume
- serverless.delete(request.getVolumeId());
LOGGER.info("failover done {}", request);
return resp;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/Serverless.java b/s3stream/src/main/java/com/automq/stream/s3/failover/Serverless.java
deleted file mode 100644
index 180caac30..000000000
--- a/s3stream/src/main/java/com/automq/stream/s3/failover/Serverless.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.automq.stream.s3.failover;
-
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-public interface Serverless {
-
- /**
- * Attach volume to the target node.
- *
- * @param volumeId volume id
- * @param nodeId target node id
- * @return attached device name
- */
- String attach(String volumeId, int nodeId) throws ExecutionException;
-
- /**
- * Delete the volume
- *
- * @param volumeId volume id
- */
- void delete(String volumeId) throws ExecutionException;
-
- /**
- * Fence the first attached node access to the volume
- *
- * @param volumeId volume id
- */
- void fence(String volumeId) throws ExecutionException;
-
- /**
- * Scan failed node
- *
- * @return {@link FailedNode} list
- */
- List scan() throws ExecutionException;
-
- class FailedNode {
- private int nodeId;
- private String volumeId;
-
- public int getNodeId() {
- return nodeId;
- }
-
- public void setNodeId(int nodeId) {
- this.nodeId = nodeId;
- }
-
- public String getVolumeId() {
- return volumeId;
- }
-
- public void setVolumeId(String volumeId) {
- this.volumeId = volumeId;
- }
-
- @Override
- public String toString() {
- return "FailedNode{" +
- "nodeId=" + nodeId +
- ", volumeId='" + volumeId + '\'' +
- '}';
- }
- }
-
-}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java b/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java
index 71a8b2a4a..6f6121d33 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/failover/FailoverTest.java
@@ -28,19 +28,15 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
public class FailoverTest {
String path;
FailoverFactory failoverFactory;
WALRecover walRecover;
- Serverless serverless;
Failover failover;
@BeforeEach
@@ -48,8 +44,7 @@ public void setup() {
path = "/tmp/" + System.currentTimeMillis() + "/failover_test_wal";
failoverFactory = mock(FailoverFactory.class);
walRecover = mock(WALRecover.class);
- serverless = mock(Serverless.class);
- failover = spy(new Failover(failoverFactory, walRecover, serverless));
+ failover = spy(new Failover(failoverFactory, walRecover));
}
@AfterEach
@@ -82,12 +77,8 @@ public void test() throws IOException, ExecutionException, InterruptedException,
// node match
request.setNodeId(233);
- failover.failover(request).get(1, TimeUnit.SECONDS);
-
- ArgumentCaptor ac = ArgumentCaptor.forClass(String.class);
- verify(serverless, times(1)).delete(ac.capture());
- String req = ac.getValue();
- assertEquals("test_volume_id", req);
+ FailoverResponse resp = failover.failover(request).get(1, TimeUnit.SECONDS);
+ assertEquals(233, resp.getNodeId());
}
}