Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Oct 11, 2024
1 parent 7c8e9dd commit e7e2883
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ public boolean isUserCongested(UserCongestionControlContext userCongestionContro
}

UserIdentifier userIdentifier = userCongestionControlContext.getUserIdentifier();
long userProduceSpeed = getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo());
// If the user produce speed is higher that the avg consume speed, will congest it
if (overHighWatermark.get()) {
long userProduceSpeed = getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo());
long avgConsumeSpeed = getPotentialProduceSpeed();
if (userProduceSpeed > avgConsumeSpeed) {
if (logger.isDebugEnabled()) {
Expand All @@ -173,7 +173,6 @@ public boolean isUserCongested(UserCongestionControlContext userCongestionContro
}
}

long userProduceSpeed = getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo());
if (userProduceSpeed > userProduceSpeedHighWatermark) {
userCongestionControlContext.onCongestionControl();
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public boolean inCongestionControl() {
}

public void updateProduceBytes(long numBytes) {
long timeNow = System.currentTimeMillis();
BufferStatusHub.BufferStatusNode node = new BufferStatusHub.BufferStatusNode(numBytes);
userBufferInfo.updateInfo(System.currentTimeMillis(), node);
workerBufferStatusHub.add(System.currentTimeMillis(), node);
userBufferInfo.updateInfo(timeNow, node);
workerBufferStatusHub.add(timeNow, node);
}

public UserBufferInfo getUserBufferInfo() {
Expand Down

0 comments on commit e7e2883

Please sign in to comment.