Skip to content

Commit

Permalink
Remove time triggered
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Oct 9, 2023
1 parent 2577ead commit f08ac75
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,18 @@ long computeQueueThreshold() {
Optional<StreamDescriptor> getNextStreamToFlush(final long queueSizeThresholdBytes) {
for (final StreamDescriptor stream : orderStreamsByPriority(bufferDequeue.getBufferedStreams())) {
final long latestFlushTimeMs = latestFlushTimeMsPerStream.computeIfAbsent(stream, _k -> nowProvider.millis());
final ImmutablePair<Boolean, String> isTimeTriggeredResult = isTimeTriggered(latestFlushTimeMs);
// final ImmutablePair<Boolean, String> isTimeTriggeredResult = isTimeTriggered(latestFlushTimeMs);
final ImmutablePair<Boolean, String> isSizeTriggeredResult = isSizeTriggered(stream, queueSizeThresholdBytes);

final String debugString = String.format(
"trigger info: %s - %s, %s , %s",
stream.getNamespace(),
stream.getName(),
isTimeTriggeredResult.getRight(),
"nope"// isTimeTriggeredResult.getRight(),
isSizeTriggeredResult.getRight());
log.debug("computed: {}", debugString);

if (isSizeTriggeredResult.getLeft() || isTimeTriggeredResult.getLeft()) {
if (isSizeTriggeredResult.getLeft()/* || isTimeTriggeredResult.getLeft()*/) {
log.info("flushing: {}", debugString);
latestFlushTimeMsPerStream.put(stream, nowProvider.millis());
return Optional.of(stream);
Expand Down

0 comments on commit f08ac75

Please sign in to comment.