Skip to content

Commit

Permalink
writers are now callables
Browse files Browse the repository at this point in the history
Signed-off-by: Atanas Atanasov <[email protected]>
  • Loading branch information
ata-nas committed Jan 29, 2025
1 parent ad1fe33 commit eb953a8
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence;

import com.hedera.block.server.ack.AckHandler;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.events.BlockNodeEventHandler;
import com.hedera.block.server.events.ObjectEvent;
Expand Down Expand Up @@ -163,8 +162,6 @@ static Compression providesCompression(@NonNull final PersistenceStorageConfig c
*
* @param config the persistence storage config needed to discern the type
* of the async block writer factory
* @param blockNodeContext the application context
* @param ackHandler the acknowledgement handler
* @param blockPathResolver the block path resolver
* @param compression the compression used
* @return an async block writer factory singleton
Expand All @@ -173,14 +170,11 @@ static Compression providesCompression(@NonNull final PersistenceStorageConfig c
@Singleton
static AsyncBlockWriterFactory providesAsyncBlockWriterFactory(
@NonNull final PersistenceStorageConfig config,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final AckHandler ackHandler,
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final Compression compression) {
final StorageType type = config.type();
return switch (type) {
case BLOCK_AS_LOCAL_FILE -> new BlockAsLocalFileAsyncWriterFactory(
blockNodeContext, ackHandler, blockPathResolver, compression);
case BLOCK_AS_LOCAL_FILE -> new BlockAsLocalFileAsyncWriterFactory(blockPathResolver, compression);
case BLOCK_AS_LOCAL_DIRECTORY, NO_OP -> throw new UnsupportedOperationException("not implemented yet");
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence;

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.StreamPersistenceHandlerError;
import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import static java.util.Objects.requireNonNull;

import com.hedera.block.common.utils.Preconditions;
Expand All @@ -16,6 +18,8 @@
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriter;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult.BlockPersistenceStatus;
import com.hedera.block.server.persistence.storage.write.BlockWriter;
import com.hedera.block.server.persistence.storage.write.factory.AsyncBlockWriterFactory;
import com.hedera.block.server.service.ServiceStatus;
Expand All @@ -28,6 +32,13 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import javax.inject.Inject;
import javax.inject.Singleton;

Expand All @@ -52,7 +63,8 @@ public class StreamPersistenceHandlerImpl
private final ServiceStatus serviceStatus;
private final AckHandler ackHandler;
private final AsyncBlockWriterFactory asyncBlockWriterFactory;
private AsyncBlockWriter asyncBlockWriter;
private final CompletionService<BlockPersistenceResult> completionService;
private LinkedTransferQueue<BlockItemUnparsed> currentWriterQueue;

/**
* Constructs a new StreamPersistenceHandlerImpl instance with the given subscription handler,
Expand Down Expand Up @@ -82,6 +94,7 @@ public StreamPersistenceHandlerImpl(
this.serviceStatus = requireNonNull(serviceStatus);
this.ackHandler = requireNonNull(ackHandler);
this.asyncBlockWriterFactory = null;
this.completionService = null;
}

// @todo(545) temp constructor so not to break the build for POC
Expand All @@ -101,6 +114,8 @@ public StreamPersistenceHandlerImpl(
this.serviceStatus = Objects.requireNonNull(serviceStatus);
this.ackHandler = Objects.requireNonNull(ackHandler);
this.asyncBlockWriterFactory = Objects.requireNonNull(asyncBlockWriterFactory);
this.completionService = new ExecutorCompletionService<>(Executors.newFixedThreadPool(10));
// @todo(545) what would be the correct setup for the executor service and also add shutdown logic
}

/**
Expand Down Expand Up @@ -167,18 +182,54 @@ public void onEvent(final ObjectEvent<SubscribeStreamResponseUnparsed> event, lo
}
}

private void handleBlockItems(final List<BlockItemUnparsed> blockItems) throws ParseException {
private void handleBlockItems(final List<BlockItemUnparsed> blockItems)
throws ParseException, BlockStreamProtocolException {
final BlockItemUnparsed firstItem = blockItems.getFirst();
if (firstItem.hasBlockHeader()) {
final long blockNumber = Preconditions.requireWhole(
BlockHeader.PROTOBUF.parse(firstItem.blockHeader()).number());
asyncBlockWriter = asyncBlockWriterFactory.create(blockNumber);
if (Objects.nonNull(currentWriterQueue)) {
// we do not expect to enter here, but if we have, this means that a block header was found
// before the previous block was completed (no block proof received).
// @todo(545) what would be the correct message/exception type here?
throw new BlockStreamProtocolException("BlockHeader found before the previous block was completed.");
} else {
final BlockHeader header = BlockHeader.PROTOBUF.parse(firstItem.blockHeader());
final long blockNumber = Preconditions.requireWhole(header.number());
final AsyncBlockWriter writer = asyncBlockWriterFactory.create(blockNumber);
currentWriterQueue = writer.getQueue();
completionService.submit(writer);
}
}
for (int i = 0; i < blockItems.size(); i++) {
asyncBlockWriter.getQueue().offer(blockItems.get(i));
currentWriterQueue.offer(blockItems.get(i));
}
if (blockItems.getLast().hasBlockProof()) {
asyncBlockWriter.flagReady();
currentWriterQueue = null;
}
Future<BlockPersistenceResult> completionResult = completionService.poll();
while (Objects.nonNull(completionResult)) {
// @todo(545) is this loop really the best way to poll for the callables? Is there not a reactive way
// to do this?
try {
final BlockPersistenceResult blockPersistenceResult = completionResult.get();
if (blockPersistenceResult.status().equals(BlockPersistenceStatus.SUCCESS)) {
final long blockNumber = blockPersistenceResult.blockNumber();
ackHandler.blockPersisted(blockNumber);
metricsService.get(BlocksPersisted).increment();
LOGGER.log(INFO, "Block [%s] persisted successfully.".formatted(blockNumber));
completionResult = completionService.poll();
} else {
// @todo(545) should we handle non successful statuses directly here?
}
// @todo(545) what would be the correct exception handling here?
// since we are expected to return a result (even for expected fails), exceptions should most
// probably be treated as a bug
} catch (final ExecutionException e) {
throw new RuntimeException(e);
} catch (final CancellationException e) {
throw new RuntimeException(e);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,11 @@

import com.hedera.hapi.block.BlockItemUnparsed;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedTransferQueue;

/**
* An asynchronous block writer. Responsible for persisting a block to storage
* in a non-blocking manner. The writer is expected to receive block items via
* a queue and persist the block (which is made up of all received items)
* once all items have been received (flagged externally). The block writer is
* not concerned with making any decisions as to if it should persist a block,
* rather, it only knows how to persist a block. Once it has been flagged, the
* writer should persist the block using the items that have been received so
* and should then seize to exist. Writers are not reusable and should be
* self-contained.
*/
public interface AsyncBlockWriter {
/**
* This method returns a queue that the writer uses to receive block items.
*
* @return the queue that this writer uses to receive block items
*/
// @todo(545): add documentation
public interface AsyncBlockWriter extends Callable<BlockPersistenceResult> {
@NonNull
LinkedTransferQueue<BlockItemUnparsed> getQueue();

/**
* This method flags the writer that it should no longer expect any more
* block items to be added to the queue and should proceed to persist the
* block using the items that have been received so far.
*/
void flagReady();
}
Original file line number Diff line number Diff line change
@@ -1,95 +1,77 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.write;

import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.BlocksPersisted;

import com.hedera.block.common.utils.FileUtilities;
import com.hedera.block.server.ack.AckHandler;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.server.persistence.storage.compression.Compression;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.write.BlockPersistenceResult.BlockPersistenceStatus;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.BlockUnparsed;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.System.Logger;
import java.lang.System.Logger.Level;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

/**
* An async block writer that handles writing of blocks as a file to local
* storage.
*/
public class BlockAsLocalFileAsyncWriter implements AsyncBlockWriter {
private static final Logger LOGGER = System.getLogger(BlockAsLocalFileAsyncWriter.class.getName());
private final MetricsService metricsService;
private final AckHandler ackHandler;
private final BlockPathResolver blockPathResolver;
private final Compression compression;
private final LinkedTransferQueue<BlockItemUnparsed> queue;
private final long blockNumber;
private volatile boolean ready = false;

public BlockAsLocalFileAsyncWriter(
@NonNull final MetricsService metricsService,
@NonNull final AckHandler ackHandler,
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final Compression compression,
final long blockNumber) {
this.metricsService = Objects.requireNonNull(metricsService);
this.ackHandler = Objects.requireNonNull(ackHandler);
this.blockPathResolver = Objects.requireNonNull(blockPathResolver);
this.compression = Objects.requireNonNull(compression);
this.blockNumber = blockNumber;
this.blockNumber = Preconditions.requireWhole(blockNumber);
this.queue = new LinkedTransferQueue<>();
runAsync();
}

private void runAsync() {
final Runnable task = () -> {
final List<BlockItemUnparsed> localBlockItems = new LinkedList<>();
while (!ready) { // wait until received all items
try {
final BlockItemUnparsed nextItem = queue.poll(1, TimeUnit.MILLISECONDS);
if (Objects.nonNull(nextItem)) { // poll returns null if timed out
localBlockItems.add(nextItem);
}
} catch (final InterruptedException e) {
throw new RuntimeException(e); // @todo(545) how to handle?!
@Override
public BlockPersistenceResult call() throws Exception {
boolean ready = false;
final List<BlockItemUnparsed> localBlockItems = new LinkedList<>();
while (!ready) { // loop until received all items (until block proof arrives)
try {
final BlockItemUnparsed nextItem = queue.take();
// @todo(545) if the block proof never arrives, we need to "know" here, so this take needs to
// unblock and we need to handle that case, we need to delete any side effects and return a
// failure status, but how to do that most efficiently? We work only with the queue outside of
// the callable.
localBlockItems.add(nextItem);
if (nextItem.hasBlockProof()) {
ready = true;
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
// @todo(545) how to handle?! maybe here we need to do cleanup and then return failure status
// that would cause the persistence to fail/restart?
}
// proceed to persist the items
try (final OutputStream out = compression.wrap(Files.newOutputStream(getResolvedBlockPath()))) {
final BlockUnparsed blockToWrite =
BlockUnparsed.newBuilder().blockItems(localBlockItems).build();
BlockUnparsed.PROTOBUF.toBytes(blockToWrite).writeTo(out);
} catch (final IOException e) {
throw new RuntimeException(e); // @todo(545) how to handle?!
}
metricsService.get(BlocksPersisted).increment();
ackHandler.blockPersisted(blockNumber);
final String blockPersistedLog = "Block [%d] persisted".formatted(blockNumber);
LOGGER.log(Level.INFO, blockPersistedLog);
// @todo(545) I need to use the notifier here? what then would happen if let's say block 2 finishes before
// block 1?
};
CompletableFuture.runAsync(task); // @todo(545) what would be the correct way to run this task async?
}

private Path getResolvedBlockPath() throws IOException {
final Path rawBlockPath = blockPathResolver.resolveLiveRawPathToBlock(blockNumber);
final Path resolvedBlockPath =
FileUtilities.appendExtension(rawBlockPath, compression.getCompressionFileExtension());
FileUtilities.createFile(resolvedBlockPath);
return resolvedBlockPath;
}
// proceed to persist the items
try (final WritableStreamingData wsd =
new WritableStreamingData(compression.wrap(Files.newOutputStream(getResolvedBlockPath())))) {
final BlockUnparsed blockToWrite =
BlockUnparsed.newBuilder().blockItems(localBlockItems).build();
BlockUnparsed.PROTOBUF.toBytes(blockToWrite).writeTo(wsd);
} catch (final IOException e) {
throw new RuntimeException(e);
// @todo(545) how to handle?!
// one option would be to retry let's say once and then if we fail we return a failure status
// but do cleanup before returning
}
return new BlockPersistenceResult(blockNumber, BlockPersistenceStatus.SUCCESS);
}

@NonNull
Expand All @@ -98,8 +80,11 @@ public LinkedTransferQueue<BlockItemUnparsed> getQueue() {
return queue;
}

@Override
public void flagReady() {
this.ready = true;
private Path getResolvedBlockPath() throws IOException {
final Path rawBlockPath = blockPathResolver.resolveLiveRawPathToBlock(blockNumber);
final Path resolvedBlockPath =
FileUtilities.appendExtension(rawBlockPath, compression.getCompressionFileExtension());
FileUtilities.createFile(resolvedBlockPath);
return resolvedBlockPath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.write;

import com.hedera.block.common.utils.Preconditions;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

// @todo(545): add documentation
public record BlockPersistenceResult(long blockNumber, @NonNull BlockPersistenceStatus status) {
public BlockPersistenceResult {
Preconditions.requireWhole(blockNumber);
Objects.requireNonNull(status);
}

// @todo(545): add documentation
public enum BlockPersistenceStatus {
SUCCESS,
FAILURE
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.write.factory;

import com.hedera.block.server.ack.AckHandler;
import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.persistence.storage.compression.Compression;
import com.hedera.block.server.persistence.storage.path.BlockPathResolver;
import com.hedera.block.server.persistence.storage.write.AsyncBlockWriter;
Expand All @@ -16,26 +13,19 @@
* Factory for creating {@link BlockAsLocalFileAsyncWriter} instances.
*/
public class BlockAsLocalFileAsyncWriterFactory implements AsyncBlockWriterFactory {
private final MetricsService metricsService;
private final AckHandler ackHandler;
private final BlockPathResolver blockPathResolver;
private final Compression compression;

@Inject
public BlockAsLocalFileAsyncWriterFactory(
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final AckHandler ackHandler,
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final Compression compression) {
this.metricsService = blockNodeContext.metricsService();
this.ackHandler = Objects.requireNonNull(ackHandler);
@NonNull final BlockPathResolver blockPathResolver, @NonNull final Compression compression) {
this.blockPathResolver = Objects.requireNonNull(blockPathResolver);
this.compression = Objects.requireNonNull(compression);
}

@NonNull
@Override
public AsyncBlockWriter create(final long blockNumber) {
return new BlockAsLocalFileAsyncWriter(metricsService, ackHandler, blockPathResolver, compression, blockNumber);
return new BlockAsLocalFileAsyncWriter(blockPathResolver, compression, blockNumber);
}
}

0 comments on commit eb953a8

Please sign in to comment.