-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Simplified asynchronous block writers #545
base: main
Are you sure you want to change the base?
Conversation
Comment at add20d7:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only part-way through, but wanted to get these comments out to support making appropriate adjustments.
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/storage/write/AsyncBlockWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileAsyncWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileAsyncWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileAsyncWriter.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/storage/write/AsyncBlockWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileAsyncWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileAsyncWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/BlockAsLocalFileAsyncWriter.java
Outdated
Show resolved
Hide resolved
/** | ||
* Factory for creating {@link BlockAsLocalFileAsyncWriter} instances. | ||
*/ | ||
public class BlockAsLocalFileAsyncWriterFactory implements AsyncBlockWriterFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a factory and interface for a class that shouldn't have multiple implementations and is only ever used in one place seems overkill.
I suspect that a simple new
is more than sufficient for block writer tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree, however I am pro keeping the factory, here my thoughts:
- The factory remains a single injection point, meaning if tomorrow the writer requires another dependency to be injected into it, it will be done via the factory (injected into the factory, which then has access to said dependency and is able to then create instances using this reference)
- Whoever requires the ability to create writers, only needs a ref to the factory and will not need to supply dependencies to itself (which might not be needed) for any other reason than to create writers.
- I am under the impression that we should also have a
NO_OP
version of the factory even now, so the interface seems to be mandatory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thoughts on those points:
- A simple constructor is a single injection point.
- There is almost not chance any entity other than the handler will need to create a writer.
- The writer's dependencies shouldn't really need to be injected, aside from things the handler already requires (such as persistence configuration).
- I'm not sure the value of a no-op implementation. For testing we're better off using the real implementation with a mock filesystem, in my opinion.
I generally would advocate for the simplest approach that is likely to work; adding complexity (including interfaces and factories) only when it's clear they're actually necessary.
That said, it's your PR, so this is just suggestion, not requirement.
17c4698
to
d4fcffa
Compare
1c0a0ff
to
ad1fe33
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more thoughts as you continue to work on this.
if (Objects.nonNull(currentWriterQueue)) { | ||
// we do not expect to enter here, but if we have, this means that a block header was found | ||
// before the previous block was completed (no block proof received). | ||
// @todo(545) what would be the correct message/exception type here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
This is an incomplete block, and we should publish a PersistenceResult
of type "INCOMPLETE_BLOCK" (or equivalent exception until the services are disentangled).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have extended the result types, have made the placeholder for it here, but for now I throw, same as the bad block number. Need the ack handler to be able to accept persistence results before migrating to that. But for the future remains to be discussed what shall we do with the persistence handler and other tasks in the completion service if we land on these statuses. Should we shut down the handler, kill all tasks etc. etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not fatal events, the handler should continue because the sources that feed blocks to the handler should handle the situation (which may be normal processing where we detected issues elsewhere and switched sources, requested a resend, etc...).
Generally, if the persistence handler can return a meaningful result other than "internal error", it should continue operating and assume some other service will do what is needed to get the block node, as a whole, back on track.
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/StreamPersistenceHandlerImpl.java
Outdated
Show resolved
Hide resolved
} catch (final InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InterruptedException
is never an error, and should never be wrapped in another exception (though it may be declared and propagated up).
When an InterruptedException
is caught, the process is expected to close any open resources and exit. Here, we have no open resources (though the individual writers may) and are already the top-level onEvent method, so we should just call Thread.currentThread().interrupt()
(which signals any other locks, or conditions on the same thread) and return from this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the IDE generates this by default, I prefer to actually do the proper handling when I actually know what to do, hence you see the todo above the catch clauses. But yes, what you say is absolutely right, I believe we should have some teardown for the handler since it is the handler's thread that needs to be interrupted while waiting on the get method of the future to produce this catch. I will address this.
On another note, we should also have/execute the teardown when shutting down the server, which sounds like it is another issue that needs to be addressed separately. I believe it is probably a good idea for us to have teardown logic on all our components in order to make sure we shutdown gracefully. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note, InterruptedException
is the "standard" means of shutting down a server...
public BlockPersistenceResult call() throws Exception { | ||
boolean ready = false; | ||
final List<BlockItemUnparsed> localBlockItems = new LinkedList<>(); | ||
while (!ready) { // loop until received all items (until block proof arrives) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we accumulating everything (and copying all the data) until the entire block is available?
We should write each item as it arrives, not wait for the whole block.
The only thing that should happen when a block proof is encountered is to close the output stream (writable streaming data below).
That approach can complete with much lower latency and avoids copying the data into a local list.
Basically the try-with-resources would wrap the while loop (which would write each item instead of adding to a list) instead of following the while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsync-swirlds I must be missing something here, maybe it is related to the usage of the writable streaming data and how to use it, are there any other examples I can use for this API that we have? (appologies for asking explicitly, seems to be something internal and I am unaware of some usage of the API we have, I have seen examples of creating the whole block and then writing that to the fs once it is all accumulated, or I am misunderstanding something)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rather wondered why you were using the WritableStreamingData
class...
I do not believe it's necessary.
The following example rewrite seems much simpler and cleaner, to my eye:
public BlockPersistenceResult call() throws IOException {
boolean blockComplete = false;
try (final var outputStream = compression.wrap(Files.newOutputStream(getResolvedBlockPath()))) {
while (!blockComplete) { // loop until received all items (until block proof arrives)
try {
final BlockItemUnparsed nextItem = queue.take();
// @todo(545) if the block proof never arrives, we need to "know" here, so this take needs to
// unblock and we need to handle that case, we need to delete any side effects and return a
// failure status, but how to do that most efficiently? We work only with the queue outside of
// the callable.
BlockItemUnparsed.PROTOBUF.toBytes(nextItem).writeTo(outputStream);
if (nextItem.hasBlockProof()) {
blockComplete = true;
}
} catch (final InterruptedException e) {
throw new RuntimeException(e);
// @todo(545) how to handle?! maybe here we need to do cleanup and then return failure status
// that would cause the persistence to fail/restart?
}
}
}
return new BlockPersistenceResult(blockNumber, BlockPersistenceStatus.SUCCESS);
}
Note, this also replaces the throws Exception
clause, with throws IOException
. When overriding an interface method, you can always declare a narrower throws clause than the interface; you just cannot declare a wider throws clause.
For Callables, when call
throws any exception, it's wrapped in an ExecutionException
and thrown by the corresponding Future.get()
call.
By allowing the IOException to throw, we can generate a "FAILED_TO_PERSIST" result in the handler instead of trying to do so here (making this code simpler).
This can go either way, of course. We could catch here and return the failure result, or catch in the handler and produce the failure result there. The effect is the same and it comes down to preference. I made the change here just to show that it could be done that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried that, unable to read blocks, looks like this is not a proper way to save the blocks. When I attempt to read them, apparently the reader is unable to parse the block when the block was saved in this manner.
P.S. also the writable streaming data write bytes does not work in the same manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct, we may need to modify the reader slightly (it might have an incorrect assumption).
Let's discuss separately (perhaps a huddle?) and figure out why the reader cannot read correctly when we write with this code.
WritableStreamingData is nothing more than a length-counting wrapper around the output stream in this use, so it really is not providing value here.
...main/java/com/hedera/block/server/persistence/storage/write/AsyncBlockAsLocalFileWriter.java
Outdated
Show resolved
Hide resolved
...main/java/com/hedera/block/server/persistence/storage/write/AsyncBlockAsLocalFileWriter.java
Outdated
Show resolved
Hide resolved
server/src/main/java/com/hedera/block/server/persistence/storage/write/AsyncBlockWriter.java
Outdated
Show resolved
Hide resolved
c4600c7
to
b00e87d
Compare
Signed-off-by: Atanas Atanasov <[email protected]>
b00e87d
to
f9b8af4
Compare
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
…ercondition to check for good block number Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
Signed-off-by: Atanas Atanasov <[email protected]>
…ed to revisit some tests to pass) Signed-off-by: Atanas Atanasov <[email protected]>
Description:
Related issue(s):
Fixes #516
Checklist