Skip to content

Commit

Permalink
feat(kafka_isssues447): add command util and update version to 0.5.2 (#…
Browse files Browse the repository at this point in the history
…709)

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Nov 23, 2023
1 parent b19a317 commit 95d491b
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.5.1-SNAPSHOT</s3stream.version>
<s3stream.version>0.5.2-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.5.1-SNAPSHOT</version>
<version>0.5.2-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
18 changes: 6 additions & 12 deletions s3stream/src/main/java/com/automq/stream/s3/failover/Failover.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public class Failover {
private final ExecutorService executor = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("wal-failover-%d", true), LOGGER);
private final FailoverFactory factory;
private final WALRecover walRecover;
private final Serverless serverless;

public Failover(FailoverFactory factory, WALRecover walRecover) {
public Failover(FailoverFactory factory, WALRecover walRecover, Serverless serverless) {
this.factory = factory;
this.walRecover = walRecover;
this.serverless = serverless;
}

public CompletableFuture<FailoverResponse> failover(FailoverRequest request) {
Expand All @@ -66,14 +68,6 @@ public CompletableFuture<FailoverResponse> failover(FailoverRequest request) {
return cf;
}

protected void fence(FailoverRequest request) {
// TODO: run command to fence the device
}

protected void complete(FailoverRequest request) {
// TODO: run command to delete the volume
}

class FailoverTask {
private final FailoverRequest request;
private int nodeId = NOOP_NODE_ID;
Expand All @@ -88,14 +82,14 @@ public FailoverResponse failover() throws Throwable {
FailoverResponse resp = new FailoverResponse();
resp.setNodeId(request.getNodeId());
// fence the device to ensure the old node stops writing to the delta WAL
fence(request);
serverless.fence(request.getVolumeId());
// recover WAL data and upload to S3
BlockWALService wal = BlockWALService.recoveryBuilder(request.getDevice()).build();
try {
wal.start();
} catch (WALNotInitializedException ex) {
LOGGER.info("fail over empty wal {}", request);
complete(request);
serverless.delete(request.getVolumeId());
return resp;
}
try {
Expand All @@ -113,7 +107,7 @@ public FailoverResponse failover() throws Throwable {
LOGGER.info("failover recover {}", request);
walRecover.recover(wal, streamManager, objectManager, taskLogger);
// delete the volume
complete(request);
serverless.delete(request.getVolumeId());
LOGGER.info("failover done {}", request);
} finally {
wal.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.failover;

import java.util.List;

public interface Serverless {

/**
* Attach volume to the target node.
* @param volumeId volume id
* @param nodeId target node id
* @return attached device name
*/
String attach(String volumeId, String nodeId);

/**
* Delete the volume
* @param volumeId volume id
*/
void delete(String volumeId);

/**
* Fence the first attached node access to the volume
* @param volumeId volume id
*/
void fence(String volumeId);

/**
* Scan failed node
* @return {@link FailedNode} list
*/
List<FailedNode> scan();

class FailedNode {
private int nodeId;
private String volumeId;

public int getNodeId() {
return nodeId;
}

public void setNodeId(int nodeId) {
this.nodeId = nodeId;
}

public String getVolumeId() {
return volumeId;
}

public void setVolumeId(String volumeId) {
this.volumeId = volumeId;
}

@Override
public String toString() {
return "FailedNode{" +
"nodeId=" + nodeId +
", volumeId='" + volumeId + '\'' +
'}';
}
}


}
51 changes: 51 additions & 0 deletions s3stream/src/main/java/com/automq/stream/utils/CommandResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.utils;

public class CommandResult {
private final int code;
private final String stdout;
private final String stderr;

public CommandResult(int code, String stdout, String stderr) {
this.code = code;
this.stdout = stdout;
this.stderr = stderr;
}

public int code() {
return code;
}

public String stdout() {
return stdout;
}

public String stderr() {
return stderr;
}

@Override
public String toString() {
return "CommandResult{" +
"code=" + code +
", stdout='" + stdout + '\'' +
", stderr='" + stderr + '\'' +
'}';
}
}
39 changes: 39 additions & 0 deletions s3stream/src/main/java/com/automq/stream/utils/CommandUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.utils;

import java.io.BufferedReader;
import java.io.IOException;

public class CommandUtils {
public CommandResult run(String... cmd) {
try {
Process p = Runtime.getRuntime().exec(cmd);
try (BufferedReader inputReader = p.inputReader();
BufferedReader errorReader = p.errorReader()) {
String stdout = String.join("\n", inputReader.lines().toList());
String stderr = String.join("\n", errorReader.lines().toList());
int code = p.waitFor();
return new CommandResult(code, stdout, stderr);
}
} catch (IOException | InterruptedException e) {
return new CommandResult(-1, "", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ public class FailoverTest {
String path;
FailoverFactory failoverFactory;
WALRecover walRecover;
Serverless serverless;
Failover failover;

@BeforeEach
public void setup() {
path = "/tmp/" + System.currentTimeMillis() + "/failover_test_wal";
failoverFactory = mock(FailoverFactory.class);
walRecover = mock(WALRecover.class);
failover = spy(new Failover(failoverFactory, walRecover));
serverless = mock(Serverless.class);
failover = spy(new Failover(failoverFactory, walRecover, serverless));
}

@AfterEach
Expand Down Expand Up @@ -83,11 +85,10 @@ public void test() throws IOException, ExecutionException, InterruptedException,
request.setNodeId(233);
failover.failover(request).get(1, TimeUnit.SECONDS);

ArgumentCaptor<FailoverRequest> ac = ArgumentCaptor.forClass(FailoverRequest.class);
verify(failover, times(1)).complete(ac.capture());
FailoverRequest req = ac.getValue();
assertEquals(233, req.getNodeId());
assertEquals("test_volume_id", req.getVolumeId());
ArgumentCaptor<String> ac = ArgumentCaptor.forClass(String.class);
verify(serverless, times(1)).delete(ac.capture());
String req = ac.getValue();
assertEquals("test_volume_id", req);
}

}

0 comments on commit 95d491b

Please sign in to comment.