From d7de235536fe0496c965a9bfe1ec99cf3ada9509 Mon Sep 17 00:00:00 2001 From: sband Date: Thu, 8 Jun 2023 13:27:28 +0530 Subject: [PATCH] fix #582: Replace FileReader with FileChannelReader --- .../io/cloud/amazon/AmazonS3Storage.java | 89 ++-- .../cloud/amazon/AmazonS3StorageReader.java | 91 ++-- .../cloud/amazon/AmazonS3StorageWriter.java | 407 +++++++++--------- .../java/org/sirix/io/file/FileReader.java | 8 - .../io/filechannel/FileChannelReader.java | 7 + 5 files changed, 293 insertions(+), 309 deletions(-) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java index c832533c2..66e97dea3 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -48,57 +48,49 @@ public final class AmazonS3Storage implements ICloudStorage { * Revisions file name. */ private static final String REVISIONS_FILENAME = "sirix.revisions"; - - /** - * Instance to local storage. - */ - private final Path file; + + /** + * Instance to local storage. + */ + private final Path file; private S3Client s3Client; /** Logger. */ private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3Storage.class)); - + /** * Byte handler pipeline. - */ - private final ByteHandlerPipeline byteHandlerPipeline; + */ + private final ByteHandlerPipeline byteHandlerPipeline; /** * Revision file data cache. - */ - private final AsyncCache cache; - - private ResourceConfiguration.AWSStorageInformation awsStorageInfo; + */ + private final AsyncCache cache; - private final AmazonS3StorageReader reader; + private ResourceConfiguration.AWSStorageInformation awsStorageInfo; + private final AmazonS3StorageReader reader; /** * Support AWS authentication only with .aws credentials file with the required * profile name from the creds file */ - public AmazonS3Storage(final ResourceConfiguration resourceConfig, - AsyncCache cache) { + public AmazonS3Storage(final ResourceConfiguration resourceConfig, AsyncCache cache) { this.awsStorageInfo = resourceConfig.awsStoreInfo; this.cache = cache; - this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; + this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; this.file = resourceConfig.resourcePath; - this.s3Client = getS3Client(); //this client is needed for the below checks, so initialize it here only. + this.s3Client = getS3Client(); // this client is needed for the below checks, so initialize it here only. String bucketName = awsStorageInfo.getBucketName(); boolean shouldCreateBucketIfNotExists = awsStorageInfo.shouldCreateBucketIfNotExists(); - if(!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { + if (!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { createBucket(bucketName); } - this.reader = new AmazonS3StorageReader(bucketName, - s3Client, - getDataFilePath().toAbsolutePath().toString(), - getRevisionFilePath().toAbsolutePath().toString(), - new ByteHandlerPipeline(this.byteHandlerPipeline), - SerializationType.DATA, - new PagePersister(), - cache.synchronous(), - resourceConfig); + this.reader = new AmazonS3StorageReader(bucketName, s3Client, getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), new ByteHandlerPipeline(this.byteHandlerPipeline), + SerializationType.DATA, new PagePersister(), cache.synchronous(), resourceConfig); } void createBucket(String bucketName) { @@ -121,38 +113,33 @@ void createBucket(String bucketName) { } boolean isBucketExists(String bucketName) { - HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); + HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); try { s3Client.headBucket(headBucketRequest); return true; } catch (NoSuchBucketException e) { return false; - } + } } S3Client getS3Client() { - return this.s3Client==null ? S3Client.builder() - .region(Region.of(awsStorageInfo.getAwsRegion())) - .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())) - .build() : this.s3Client; + return this.s3Client == null + ? S3Client.builder().region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())).build() + : this.s3Client; } S3AsyncClient getAsyncS3Client() { - return S3AsyncClient.builder() - .region(Region.of(awsStorageInfo.getAwsRegion())) - .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())) - .build(); + return S3AsyncClient.builder().region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())).build(); } @Override public Writer createWriter() { - return new AmazonS3StorageWriter (getDataFilePath().toAbsolutePath().toString(), - getRevisionFilePath().toAbsolutePath().toString(), - awsStorageInfo.getBucketName(), - SerializationType.DATA,new PagePersister(), - cache,reader, - this.getAsyncS3Client()); + return new AmazonS3StorageWriter(getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), awsStorageInfo.getBucketName(), + SerializationType.DATA, new PagePersister(), cache, reader, this.getAsyncS3Client()); } @Override @@ -169,10 +156,10 @@ public void close() { public boolean exists() { Path storage = this.reader.readObjectDataFromS3(getDataFilePath().toAbsolutePath().toString()); try { - return Files.exists(storage) && Files.size(storage) > 0; - } catch (final IOException e) { - throw new UncheckedIOException(e); - } + return Files.exists(storage) && Files.size(storage) > 0; + } catch (final IOException e) { + throw new UncheckedIOException(e); + } } @Override @@ -181,12 +168,12 @@ public ByteHandler getByteHandler() { } /** - * Getting path for data file. - * This path would be used on the local storage + * Getting path for data file. This path would be used on the local storage + * * @return the path for this data file */ private Path getDataFilePath() { - return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME); + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME); } /** @@ -195,6 +182,6 @@ private Path getDataFilePath() { * @return the concrete storage for this database */ private Path getRevisionFilePath() { - return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME); + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME); } } diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java index b1c96382e..ca36e3aec 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.io.OutputStream; import java.io.RandomAccessFile; -import java.nio.file.FileSystems; import java.nio.file.Path; import java.time.Instant; @@ -16,6 +15,7 @@ import org.sirix.io.RevisionFileData; import org.sirix.io.bytepipe.ByteHandler; import org.sirix.io.file.FileReader; +import org.sirix.io.filechannel.FileChannelReader; import org.sirix.page.PagePersister; import org.sirix.page.PageReference; import org.sirix.page.RevisionRootPage; @@ -34,11 +34,11 @@ import software.amazon.awssdk.services.s3.model.S3Exception; public class AmazonS3StorageReader implements Reader { - + /** * S3 storage bucket name * - */ + */ private final String bucketName; private final S3Client s3Client; @@ -46,71 +46,60 @@ public class AmazonS3StorageReader implements Reader { private final ResourceConfiguration resourceConfig; /** Logger. */ private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageReader.class)); - - - private FileReader reader; - - public AmazonS3StorageReader(String bucketName, - S3Client s3Client, - String dataFileKeyName, - String revisionsOffsetFileKeyName, - final ByteHandler byteHandler, - final SerializationType serializationType, - final PagePersister pagePersister, - final Cache cache, - ResourceConfiguration resourceConfig) { + + private FileChannelReader reader; + + public AmazonS3StorageReader(String bucketName, S3Client s3Client, String dataFileKeyName, + String revisionsOffsetFileKeyName, final ByteHandler byteHandler, final SerializationType serializationType, + final PagePersister pagePersister, final Cache cache, + ResourceConfiguration resourceConfig) { this.bucketName = bucketName; this.s3Client = s3Client; this.resourceConfig = resourceConfig; Path dataFilePath = readObjectDataFromS3(dataFileKeyName); Path revisionOffsetFilePath = readObjectDataFromS3(revisionsOffsetFileKeyName); try { - this.reader = new FileReader(new RandomAccessFile(dataFilePath.toFile(), "r"), - new RandomAccessFile(revisionOffsetFilePath.toFile(), "r"), - byteHandler, - serializationType, - pagePersister, - cache); - }catch(IOException io) { + this.reader = new FileChannelReader(new RandomAccessFile(dataFilePath.toFile(), "r").getChannel(), + new RandomAccessFile(revisionOffsetFilePath.toFile(), "r").getChannel(), byteHandler, serializationType, + pagePersister, cache); + } catch (IOException io) { LOGGER.error(io.getMessage()); System.exit(1); } } - + /** * @param keyName - Key name of the object to be read from S3 storage - * @return path - The location of the local file that contains the data that is written to the file system storage - * in the system temp directory. + * @return path - The location of the local file that contains the data that is + * written to the file system storage in the system temp directory. */ protected Path readObjectDataFromS3(String keyName) { - + try { - GetObjectRequest objectRequest = GetObjectRequest - .builder() - .key(keyName) - .bucket(bucketName) - .build(); - - ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); - byte[] data = objectBytes.asByteArray(); - /*As the bucketName has to be same as the database name, it makes sense to use/create file on the local filesystem - * instead of in the tmp partition*/ - Path path = resourceConfig.resourcePath; - // Write the data to a local file. - File myFile = path.toFile(); - try(OutputStream os = new FileOutputStream(myFile)){ - os.write(data); - } - return path; - } catch (IOException ex) { - ex.printStackTrace(); - } catch (S3Exception e) { - LOGGER.error(e.awsErrorDetails().errorMessage()); - System.exit(1); - } + GetObjectRequest objectRequest = GetObjectRequest.builder().key(keyName).bucket(bucketName).build(); + + ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); + byte[] data = objectBytes.asByteArray(); + /* + * As the bucketName has to be same as the database name, it makes sense to + * use/create file on the local filesystem instead of in the tmp partition + */ + Path path = resourceConfig.resourcePath; + // Write the data to a local file. + File myFile = path.toFile(); + try (OutputStream os = new FileOutputStream(myFile)) { + os.write(data); + } + return path; + } catch (IOException ex) { + ex.printStackTrace(); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } return null; - } + } ByteHandler getByteHandler() { return this.reader.getByteHandler(); diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java index f758924a4..3b8d34c61 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -48,259 +48,268 @@ public class AmazonS3StorageWriter extends AbstractForwardingReader implements Writer { /** - * Random access to work on. - */ - private RandomAccessFile dataFile; + * Random access to work on. + */ + private RandomAccessFile dataFile; - /** - * {@link AmazonS3StorageReader} reference for this writer. - */ - private final AmazonS3StorageReader reader; + /** + * {@link AmazonS3StorageReader} reference for this writer. + */ + private final AmazonS3StorageReader reader; - private final SerializationType type; + private final SerializationType type; - private RandomAccessFile revisionsFile; + private RandomAccessFile revisionsFile; - private final PagePersister pagePersister; + private final PagePersister pagePersister; - private final AsyncCache cache; + private final AsyncCache cache; - private boolean isFirstUberPage; + private boolean isFirstUberPage; - private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); + private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); - private final S3AsyncClient s3Client; + private final S3AsyncClient s3Client; - private final String bucketName; + private final String bucketName; - /** Logger. */ - private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); - public AmazonS3StorageWriter (final String dataFileKeyName, final String revisionsOffsetFileKeyName, - final String bucketName, - final SerializationType serializationType, final PagePersister pagePersister, - final AsyncCache cache, final AmazonS3StorageReader reader, - final S3AsyncClient s3Client) { + public AmazonS3StorageWriter(final String dataFileKeyName, final String revisionsOffsetFileKeyName, + final String bucketName, final SerializationType serializationType, final PagePersister pagePersister, + final AsyncCache cache, final AmazonS3StorageReader reader, + final S3AsyncClient s3Client) { this.bucketName = bucketName; type = requireNonNull(serializationType); try { - this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(),"rw"); - this.revisionsFile = type == SerializationType.DATA ? - new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(),"rw") : null; - }catch(IOException io) { - LOGGER.error(String.format("Cannot create S3 storage writer, " - + "please check if DATA path OR Revision offset file path exists. Error details : %s", io.getMessage())); + this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(), + "rw"); + this.revisionsFile = type == SerializationType.DATA + ? new RandomAccessFile( + requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(), "rw") + : null; + } catch (IOException io) { + LOGGER.error(String.format( + "Cannot create S3 storage writer, " + + "please check if DATA path OR Revision offset file path exists. Error details : %s", + io.getMessage())); } - this.pagePersister = requireNonNull(pagePersister); - this.cache = cache; - this.reader = requireNonNull(reader); - this.s3Client = s3Client; + this.pagePersister = requireNonNull(pagePersister); + this.cache = cache; + this.reader = requireNonNull(reader); + this.s3Client = s3Client; } /** * @param bucketName - S3 bucket name on AWS - * @param keyName - Name of the file that includes the full path that is supposed to be used on the local file system - * @param object - File that could be read from the local filesystem that contains the actual information - * to be stored on S3 - * - * */ + * @param keyName - Name of the file that includes the full path that is + * supposed to be used on the local file system + * @param object - File that could be read from the local filesystem that + * contains the actual information to be stored on S3 + * + */ protected void writeObjectToS3(String keyName, File object, boolean isDataFile) { try { - Map metadata = new HashMap<>(); - metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); - PutObjectRequest putOb = PutObjectRequest.builder() - .bucket(bucketName) - .key(keyName) - .metadata(metadata) - .build(); - - CompletableFuture objectFutureResponse = s3Client.putObject(putOb, - AsyncRequestBody.fromFile(object)); - objectFutureResponse.whenComplete((response, error) -> { - try { - if (response != null) { - LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName,bucketName)); - /*No need to delete/cleanup the file as we are writing on the local file system, so this avoid - * unnecessarily filling up the filesystem space*/ - } else { - // Handle error - error.printStackTrace(); - LOGGER.error(error.getMessage()); - System.exit(1); - } - } finally { - s3Client.close(); - } - }); - - objectFutureResponse.join(); - } catch (S3Exception e) { - LOGGER.error(e.awsErrorDetails().errorMessage()); - System.exit(1); - } + Map metadata = new HashMap<>(); + metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); + PutObjectRequest putOb = PutObjectRequest.builder().bucket(bucketName).key(keyName).metadata(metadata) + .build(); + + CompletableFuture objectFutureResponse = s3Client.putObject(putOb, + AsyncRequestBody.fromFile(object)); + objectFutureResponse.whenComplete((response, error) -> { + try { + if (response != null) { + LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName, bucketName)); + /* + * No need to delete/cleanup the file as we are writing on the local file + * system, so this avoid unnecessarily filling up the filesystem space + */ + } else { + // Handle error + error.printStackTrace(); + LOGGER.error(error.getMessage()); + System.exit(1); + } + } finally { + s3Client.close(); + } + }); + + objectFutureResponse.join(); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } } - @Override public void close() { try { - if (dataFile != null) { - dataFile.close(); - } - if (revisionsFile != null) { - revisionsFile.close(); - } - if (reader != null) { - reader.close(); - } - this.s3Client.close(); - } catch (final IOException e) { - throw new SirixIOException(e); - } + if (dataFile != null) { + dataFile.close(); + } + if (revisionsFile != null) { + revisionsFile.close(); + } + if (reader != null) { + reader.close(); + } + this.s3Client.close(); + } catch (final IOException e) { + throw new SirixIOException(e); + } } @Override public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { try { - final long fileSize = dataFile.length(); - long offset = fileSize == 0 ? IOStorage.FIRST_BEACON : fileSize; - return writePageReference(pageReadOnlyTrx, pageReference, offset); - } catch (final IOException e) { - throw new SirixIOException(e); - } + final long fileSize = dataFile.length(); + long offset = fileSize == 0 ? IOStorage.FIRST_BEACON : fileSize; + return writePageReference(pageReadOnlyTrx, pageReference, offset); + } catch (final IOException e) { + throw new SirixIOException(e); + } } private String getFileKeyName(String fileDescriptorPath) { - return fileDescriptorPath.substring((System.getProperty("java.io.tmpdir")+FileSystems.getDefault().getSeparator()).length()); + return fileDescriptorPath + .substring((System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator()).length()); } @NotNull - private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, final PageReference pageReference, - long offset) { - // Perform byte operations. - try { - // Serialize page. - final Page page = pageReference.getPage(); - - final byte[] serializedPage; - - try (final ByteArrayOutputStream output = new ByteArrayOutputStream(1_000); - final DataOutputStream dataOutput = new DataOutputStream(reader.getByteHandler().serialize(output))) { - pagePersister.serializePage(pageReadOnlyTrx, byteBufferBytes, page, type); - final var byteArray = byteBufferBytes.toByteArray(); - dataOutput.write(byteArray); - dataOutput.flush(); - serializedPage = output.toByteArray(); - } - - byteBufferBytes.clear(); - - final byte[] writtenPage = new byte[serializedPage.length + IOStorage.OTHER_BEACON]; - final ByteBuffer buffer = ByteBuffer.allocate(writtenPage.length); - buffer.putInt(serializedPage.length); - buffer.put(serializedPage); - buffer.flip(); - buffer.get(writtenPage); - - // Getting actual offset and appending to the end of the current file. - if (type == SerializationType.DATA) { - if (page instanceof RevisionRootPage) { - if (offset % REVISION_ROOT_PAGE_BYTE_ALIGN != 0) { - offset += REVISION_ROOT_PAGE_BYTE_ALIGN - (offset % REVISION_ROOT_PAGE_BYTE_ALIGN); - } - } else if (offset % PAGE_FRAGMENT_BYTE_ALIGN != 0) { - offset += PAGE_FRAGMENT_BYTE_ALIGN - (offset % PAGE_FRAGMENT_BYTE_ALIGN); - } - } - dataFile.seek(offset); - dataFile.write(writtenPage); - /*Write the file object to S3*/ - this.writeObjectToS3(this.getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); - - // Remember page coordinates. - pageReference.setKey(offset); - - if (page instanceof KeyValueLeafPage keyValueLeafPage) { - pageReference.setHash(keyValueLeafPage.getHashCode()); - } else { - /*TODO : Check for correctness of this*/ - pageReference.setHash(reader.getHashFunction().hashBytes(serializedPage).asBytes()); - } - - if (type == SerializationType.DATA) { - if (page instanceof RevisionRootPage revisionRootPage) { - if (revisionRootPage.getRevision() == 0) { - revisionsFile.seek(revisionsFile.length() + IOStorage.FIRST_BEACON); - } else { - revisionsFile.seek(revisionsFile.length()); - } - revisionsFile.writeLong(offset); - revisionsFile.writeLong(revisionRootPage.getRevisionTimestamp()); - if (cache != null) { - final long currOffset = offset; - cache.put(revisionRootPage.getRevision(), - CompletableFuture.supplyAsync(() -> new RevisionFileData(currOffset, - Instant.ofEpochMilli(revisionRootPage.getRevisionTimestamp())))); - } - } else if (page instanceof UberPage && isFirstUberPage) { - revisionsFile.seek(0); - revisionsFile.write(serializedPage); - revisionsFile.seek(IOStorage.FIRST_BEACON >> 1); - revisionsFile.write(serializedPage); - } - this.writeObjectToS3(this.getFileKeyName(revisionsFile.getFD().toString()), new File(revisionsFile.getFD().toString()), Boolean.FALSE); - } - - return this; - } catch (final IOException e) { - throw new SirixIOException(e); - } - } + private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, + final PageReference pageReference, long offset) { + // Perform byte operations. + try { + // Serialize page. + final Page page = pageReference.getPage(); + + final byte[] serializedPage; + + try (final ByteArrayOutputStream output = new ByteArrayOutputStream(1_000); + final DataOutputStream dataOutput = new DataOutputStream( + reader.getByteHandler().serialize(output))) { + pagePersister.serializePage(pageReadOnlyTrx, byteBufferBytes, page, type); + final var byteArray = byteBufferBytes.toByteArray(); + dataOutput.write(byteArray); + dataOutput.flush(); + serializedPage = output.toByteArray(); + } + + byteBufferBytes.clear(); + + final byte[] writtenPage = new byte[serializedPage.length + IOStorage.OTHER_BEACON]; + final ByteBuffer buffer = ByteBuffer.allocate(writtenPage.length); + buffer.putInt(serializedPage.length); + buffer.put(serializedPage); + buffer.flip(); + buffer.get(writtenPage); + + // Getting actual offset and appending to the end of the current file. + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage) { + if (offset % REVISION_ROOT_PAGE_BYTE_ALIGN != 0) { + offset += REVISION_ROOT_PAGE_BYTE_ALIGN - (offset % REVISION_ROOT_PAGE_BYTE_ALIGN); + } + } else if (offset % PAGE_FRAGMENT_BYTE_ALIGN != 0) { + offset += PAGE_FRAGMENT_BYTE_ALIGN - (offset % PAGE_FRAGMENT_BYTE_ALIGN); + } + } + dataFile.seek(offset); + dataFile.write(writtenPage); + /* Write the file object to S3 */ + this.writeObjectToS3(this.getFileKeyName(dataFile.getFD().toString()), + new File(dataFile.getFD().toString()), Boolean.TRUE); + + // Remember page coordinates. + pageReference.setKey(offset); + + if (page instanceof KeyValueLeafPage keyValueLeafPage) { + pageReference.setHash(keyValueLeafPage.getHashCode()); + } else { + pageReference.setHash(reader.getHashFunction().hashBytes(serializedPage).asBytes()); + } + + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage revisionRootPage) { + if (revisionRootPage.getRevision() == 0) { + revisionsFile.seek(revisionsFile.length() + IOStorage.FIRST_BEACON); + } else { + revisionsFile.seek(revisionsFile.length()); + } + revisionsFile.writeLong(offset); + revisionsFile.writeLong(revisionRootPage.getRevisionTimestamp()); + if (cache != null) { + final long currOffset = offset; + cache.put(revisionRootPage.getRevision(), + CompletableFuture.supplyAsync(() -> new RevisionFileData(currOffset, + Instant.ofEpochMilli(revisionRootPage.getRevisionTimestamp())))); + } + } else if (page instanceof UberPage && isFirstUberPage) { + revisionsFile.seek(0); + revisionsFile.write(serializedPage); + revisionsFile.seek(IOStorage.FIRST_BEACON >> 1); + revisionsFile.write(serializedPage); + } + this.writeObjectToS3(this.getFileKeyName(revisionsFile.getFD().toString()), + new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + + return this; + } catch (final IOException e) { + throw new SirixIOException(e); + } + } @Override public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { isFirstUberPage = true; - writePageReference(pageReadOnlyTrx, pageReference, 0); - isFirstUberPage = false; - writePageReference(pageReadOnlyTrx, pageReference, 100); - return this; + writePageReference(pageReadOnlyTrx, pageReference, 0); + isFirstUberPage = false; + writePageReference(pageReadOnlyTrx, pageReference, 100); + return this; } @Override public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { try { - final var dataFileRevisionRootPageOffset = - cache.get(revision, (unused) -> getRevisionFileData(revision)).get(5, TimeUnit.SECONDS).offset(); - - // Read page from file. - dataFile.seek(dataFileRevisionRootPageOffset); - final int dataLength = dataFile.readInt(); - - dataFile.getChannel().truncate(dataFileRevisionRootPageOffset + IOStorage.OTHER_BEACON + dataLength); - this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); - } catch (InterruptedException | ExecutionException | TimeoutException | IOException e) { - throw new IllegalStateException(e); - } + final var dataFileRevisionRootPageOffset = cache.get(revision, (unused) -> getRevisionFileData(revision)) + .get(5, TimeUnit.SECONDS).offset(); + + // Read page from file. + dataFile.seek(dataFileRevisionRootPageOffset); + final int dataLength = dataFile.readInt(); + + dataFile.getChannel().truncate(dataFileRevisionRootPageOffset + IOStorage.OTHER_BEACON + dataLength); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), + Boolean.TRUE); + } catch (InterruptedException | ExecutionException | TimeoutException | IOException e) { + throw new IllegalStateException(e); + } - return this; + return this; } @Override public Writer truncate() { try { - dataFile.setLength(0); - this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), Boolean.TRUE); - if (revisionsFile != null) { - revisionsFile.setLength(0); - this.writeObjectToS3(getFileKeyName(revisionsFile.getFD().toString()), new File(revisionsFile.getFD().toString()), Boolean.FALSE); - } - } catch (final IOException e) { - throw new SirixIOException(e); - } - - return this; + dataFile.setLength(0); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), + Boolean.TRUE); + if (revisionsFile != null) { + revisionsFile.setLength(0); + this.writeObjectToS3(getFileKeyName(revisionsFile.getFD().toString()), + new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + } catch (final IOException e) { + throw new SirixIOException(e); + } + + return this; } @Override diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java index 0bbe9dd6d..0a9e5c6ea 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/file/FileReader.java @@ -205,12 +205,4 @@ public void close() { throw new SirixIOException(e); } } - - public ByteHandler getByteHandler() { - return this.byteHandler; - } - - public HashFunction getHashFunction() { - return this.hashFunction; - } } diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java index 195cc0074..9a32ac3cc 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java @@ -167,4 +167,11 @@ public RevisionFileData getRevisionFileData(int revision) { public void close() { } + public ByteHandler getByteHandler() { + return this.byteHandler; + } + + public HashFunction getHashFunction() { + return this.hashFunction; + } }