Skip to content

Commit

Permalink
refactor(s3stream/wal): use stat to check whether the path is a blo…
Browse files Browse the repository at this point in the history
…ck device (#935)

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Feb 20, 2024
1 parent 1d1e0ff commit 31bfba3
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
5 changes: 5 additions & 0 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>aspectjweaver</artifactId>
<version>${aspectj.version}</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;

import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;

public class BenchTool {

public static Namespace parseArgs(ArgumentParser parser, String[] args) {
Expand Down Expand Up @@ -51,7 +53,7 @@ public static int recoverAndReset(WriteAheadLog wal) {

public static void resetWALHeader(String path) throws IOException {
System.out.println("Resetting WAL header");
if (path.startsWith(WALChannel.DEVICE_PREFIX)) {
if (isBlockDevice(path)) {
// block device
int capacity = BlockWALService.WAL_HEADER_TOTAL_CAPACITY;
WALChannel channel = WALChannel.builder(path).capacity(capacity).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;
import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;

public class WALBlockDeviceChannel implements WALChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class);
Expand Down Expand Up @@ -98,7 +99,7 @@ public static String checkAvailable(String path) {
return "java.nio.DirectByteBuffer.<init>(long, int) not available." +
" Add --add-opens=java.base/java.nio=ALL-UNNAMED and -Dio.netty.tryReflectionSetAccessible=true to JVM options may fix this.";
}
if (!path.startsWith(DEVICE_PREFIX)) {
if (!isBlockDevice(path)) {
String reason = tryOpenFileWithDirectIO(String.format(CHECK_DIRECT_IO_AVAILABLE_FORMAT, path));
if (null != reason) {
return "O_DIRECT not supported by the file system, path: " + path + ", reason: " + reason;
Expand All @@ -109,23 +110,27 @@ public static String checkAvailable(String path) {

/**
* Try to open a file with O_DIRECT flag to check whether the file system supports O_DIRECT.
* NOTE: The file is not actually created.
* The file will be deleted after the test.
*
* @return null if the file is opened successfully, otherwise the reason why it's not available
*/
private static String tryOpenFileWithDirectIO(String path) {
File file = new File(path);
try {
DirectRandomAccessFile randomAccessFile = new DirectRandomAccessFile(new File(path), "rw");
DirectRandomAccessFile randomAccessFile = new DirectRandomAccessFile(file, "rw");
randomAccessFile.close();
return null;
} catch (IOException e) {
return e.getMessage();
} finally {
// the file may be created in {@link DirectRandomAccessFile(File, String)}, so delete it
file.delete();
}
}

@Override
public void open(CapacityReader reader) throws IOException {
if (!path.startsWith(WALChannel.DEVICE_PREFIX)) {
if (!isBlockDevice(path)) {
openAndCheckFile();
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;
import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice;

/**
* There are two implementations of WALChannel:
Expand All @@ -30,7 +31,6 @@ public interface WALChannel {

Logger LOGGER = LoggerFactory.getLogger(WALChannel.class);

String DEVICE_PREFIX = "/dev/";
long DEFAULT_RETRY_INTERVAL = 100L;

static WALChannelBuilder builder(String path) {
Expand Down Expand Up @@ -248,7 +248,7 @@ public WALChannel build() {
if (direct != null) {
// Set by user.
useDirect = direct;
} else if (path.startsWith(DEVICE_PREFIX)) {
} else if (isBlockDevice(path)) {
// We can only use direct IO for block devices.
useDirect = true;
} else if (directNotAvailableMsg == null) {
Expand Down
14 changes: 14 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
import jnr.posix.POSIXFactory;

public class WALUtil {
public static final int BLOCK_SIZE = Integer.parseInt(System.getProperty(
Expand Down Expand Up @@ -116,4 +117,17 @@ public static long getBlockDeviceCapacity(String path) throws ExecutionException
}
return Long.parseLong(result.stdout().trim());
}

/**
* Check if the given path is a block device.
* It returns false if the path does not exist.
*/
public static boolean isBlockDevice(String path) {
if (!new File(path).exists()) {
return false;
}
return POSIXFactory.getPOSIX()
.stat(path)
.isBlockDev();
}
}

0 comments on commit 31bfba3

Please sign in to comment.