-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-14110 Support to limit content size in PackageFlowFile #9595
base: main
Are you sure you want to change the base?
Conversation
Due to using a different API to retrieve the FlowFiles the behaviour when working with multiple queues is no longer unspecified.
return FlowFileFilterResult.ACCEPT_AND_CONTINUE; | ||
} | ||
|
||
if ((size + flowFile.getSize() > maxBytes) || (count + 1 > maxCount)) { | ||
if (size > maxBytes || count > maxCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous code count and size were not changed for this case. The case of reject and terminate is terminal and this FlowFileFilterResult will not be further used nor those values in anyway referenced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review feedback @joewitt.
I'm not sure I correctly understood your comment. Please clarify if the following doesn't address your point.
My goal was to simplify the implementation by unifying the three points of computation (inside the first if
statement, inside the predicate of the second if
, and before the last return) into a single point.
I added test cases for a few selected scenarios before adjusting the implementation to ensure that the behavior remains the same.
The JavaDoc states:
Returns a new {@link FlowFileFilter} that will pull FlowFiles until the maximum file size has been reached, or the maximum FlowFile Count was been reached ...
Therefore, once either the maximum count or accumulated size limit is reached, all subsequent FlowFiles should be rejected.
Upon closer inspection, the old implementation contained a minor bug related to this behavior. If a FlowFile exceeding the size limit was initially rejected (with REJECT_AND_TERMINATE
), and the caller subsequently passed an empty FlowFile to the filter, the filter would incorrectly return ACCEPT_AND_CONTINUE
even though the limit had already been reached. While this behavior is technically incorrect, it's primarily a consequence of the caller not adhering to the FlowFileFilterResult
contract and thus not a critical bug.
I enhanced one of the test cases to demonstrate this behavior.
Both the old and new implementations are susceptible to potential integer overflow issues for both count
and size
. I'm not sure if that's something worth addressing though.
I'm content with leaving the implementation of newSizeBasedFilter
unchanged if that's prefered. I encountered the code while working on the desired changes for PackageFlowFile
and deemed it worth some tests / minor refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dove in to understand this code and the framework's usage of it, and the changes look good to me, and are a bit more readable. The framework does not currently call filter() again if the FilterResult indicates TERMINATE. But if this changes in the future, this code continues to return REJECT_AND_TERMINATE on following calls to filter(), as expected.
I recommend modifying a MultiProcessorUseCase or creating another UseCase in order to make the documentation for combining the two Batch Size properties clear. It's very important to be clear that flowfiles will not be delayed in the input queue waiting for a batch size to be reached. It's also very important to support packaging exactly 1 flowfile. While this improvement appears to be worthwhile, we should be very careful with configuration creep on PackageFlowFile. It's only justification for existence is to be easier to use than MergeContent for a specific use case. Too many features would ruin that justification. |
Thank you for the useful feedback @mosermw. I've adjusted the documentation of the UseCases to clarify the batching behaviour of the processor.
Personally I do not intent to add other properties to the processor at the moment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed and tested this in various scenarios in running NiFi and just have a few minor comments.
return FlowFileFilterResult.ACCEPT_AND_CONTINUE; | ||
} | ||
|
||
if ((size + flowFile.getSize() > maxBytes) || (count + 1 > maxCount)) { | ||
if (size > maxBytes || count > maxCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dove in to understand this code and the framework's usage of it, and the changes look good to me, and are a bit more readable. The framework does not currently call filter() again if the FilterResult indicates TERMINATE. But if this changes in the future, this code continues to return REJECT_AND_TERMINATE on following calls to filter(), as expected.
private int claimFlowFileId() { | ||
return flowFileId++; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add a newline to the end of this file?
|
||
It can be more efficient to transmit one larger package of batched FlowFiles than each FlowFile packaged separately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line appears to be redundant with the BATCHING_BEHAVIOUR_DESCRIPTION, should we remove it?
.description("Maximum combined content size of FlowFiles to package into one output FlowFile. " + | ||
"Note, that FlowFiles whose content exceeds this limit are packaged separately.") | ||
.required(true) | ||
.defaultValue("1 GB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default Maximum Batch Size is "1", indicating a desire for default configuration to package 1 FlowFile per output. I think we should maintain that default behavior with this new Maximum Batch Content Size property. Would you make its default something like "0 MB" so that 1 FlowFile is packaged?
"Maximum Batch Size" > 1 can improve storage or transmission efficiency by batching many FlowFiles together into 1 larger file. | ||
"Maximum Batch Content Size" can be used to enforce a soft upper limit on the overall package size. | ||
|
||
Note, that the Batch properties only restrict the maximum amount of FlowFiles to incorporate into a single package. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for including this note! It's important.
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) | ||
.build(); | ||
|
||
private static final List<PropertyDescriptor> PROPERTIES = List.of(BATCH_SIZE, BATCH_CONTENT_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a conflict with the main branch related to the List of PropertyDescriptors. By convention, try to keep 1 PropertyDescriptor per line for readability.
Due to using a different API to retrieve the FlowFiles the behaviour when working with multiple queues is no longer unspecified.
I had an circular dependency problem when depending on
nifi-mock
fromnifi-utils
, which is why I use an anonymous implementation ofFlowFile
inside the tests instead ofMockFlowFile
.Summary
NIFI-14110
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000
NIFI-00000
Pull Request Formatting
main
branchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-check
Licensing
LICENSE
andNOTICE
filesDocumentation