Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 83 additions & 33 deletions src/main/java/org/apache/commons/lang3/concurrent/TimedSemaphore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.commons.lang3.concurrent;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.commons.lang3.Validate;
Expand Down Expand Up @@ -123,6 +124,7 @@ public static class Builder implements Supplier<TimedSemaphore> {
private long period;
private TimeUnit timeUnit;
private int limit;
private boolean fair = false;

/**
* Constructs a new Builder.
Expand Down Expand Up @@ -179,6 +181,17 @@ public Builder setTimeUnit(final TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}

/**
* ADDED: Enables or disables FIFO fairness
*
* @param fair whether to enable fairness
* @return {@code this} instance.
*/
public Builder setFair(final boolean fair) {
this.fair = fair;
return this;
}
}

/**
Expand All @@ -190,6 +203,7 @@ public Builder setTimeUnit(final TimeUnit timeUnit) {
/** Constant for the thread pool size for the executor. */
private static final int THREAD_POOL_SIZE = 1;


/**
* Constructs a new Builder.
*
Expand Down Expand Up @@ -235,10 +249,18 @@ public static Builder builder() {
/** A flag whether shutdown() was called. */
private boolean shutdown; // @GuardedBy("this")

/** Fairness mode checker */
private final boolean fair;

/** Backing semaphore that enforces blocking and (optionally) FIFO fairness. */
private final Semaphore semaphore;


private TimedSemaphore(final Builder builder) {
Validate.inclusiveBetween(1, Long.MAX_VALUE, builder.period, "Time period must be greater than 0.");
period = builder.period;
unit = builder.timeUnit;
this.fair = builder.fair;
if (builder.service != null) {
executorService = builder.service;
ownExecutor = false;
Expand All @@ -250,6 +272,7 @@ private TimedSemaphore(final Builder builder) {
ownExecutor = true;
}
setLimit(builder.limit);
semaphore = new Semaphore(Math.max(0, this.limit), fair);
}

/**
Expand All @@ -264,6 +287,7 @@ private TimedSemaphore(final Builder builder) {
@Deprecated
public TimedSemaphore(final long timePeriod, final TimeUnit timeUnit, final int limit) {
this(null, timePeriod, timeUnit, limit);

}

/**
Expand All @@ -290,41 +314,48 @@ public TimedSemaphore(final ScheduledExecutorService service, final long timePer
* @throws InterruptedException if the thread gets interrupted.
* @throws IllegalStateException if this semaphore is already shut down.
*/
public synchronized void acquire() throws InterruptedException {
public void acquire() throws InterruptedException {
prepareAcquire();
boolean canPass;
do {
canPass = acquirePermit();
if (!canPass) {
wait();
if (getLimit() <= NO_LIMIT) {
synchronized (this) {
acquireCount++;
}
return;
}
if (semaphore.tryAcquire()) {
synchronized (this) {
acquireCount++;
}
return;
}
for (;;) {
synchronized (this) {
if (semaphore.tryAcquire()) {
acquireCount++;
return;
}
this.wait();
}
} while (!canPass);
}

/**
* Internal helper method for acquiring a permit. This method checks whether currently a permit can be acquired and - if so - increases the internal
* counter. The return value indicates whether a permit could be acquired. This method must be called with the lock of this object held.
*
* @return a flag whether a permit could be acquired.
*/
private boolean acquirePermit() {
if (getLimit() <= NO_LIMIT || acquireCount < getLimit()) {
acquireCount++;
return true;
}
return false;
}

/**
* The current time period is finished. This method is called by the timer used internally to monitor the time period. It resets the counter and releases
* the threads waiting for this barrier.
*/
synchronized void endOfPeriod() {
void endOfPeriod() {
lastCallsPerPeriod = acquireCount;
totalAcquireCount += acquireCount;
periodCount++;
acquireCount = 0;
notifyAll();
final int avail = semaphore.availablePermits();
final int toRelease = Math.max(0, getLimit() - avail);
if (toRelease > 0) {
semaphore.release(toRelease);
}
synchronized (this) {
this.notifyAll();
}
}

/**
Expand Down Expand Up @@ -420,14 +451,15 @@ public synchronized boolean isShutdown() {
* object held.
*/
private void prepareAcquire() {
if (isShutdown()) {
throw new IllegalStateException("TimedSemaphore is shut down!");
}
if (task == null) {
task = startTimer();
synchronized (this) {
if (isShutdown()) {
throw new IllegalStateException("TimedSemaphore is shut down!");
}
if (task == null) {
task = startTimer();
}
}
}

/**
* Sets the limit. This is the number of times the {@link #acquire()} method can be called within the time period specified. If this limit is reached,
* further invocations of {@link #acquire()} will block. Setting the limit to a value &lt;= {@link #NO_LIMIT} will cause the limit to be disabled, i.e. an
Expand All @@ -437,6 +469,17 @@ private void prepareAcquire() {
*/
public final synchronized void setLimit(final int limit) {
this.limit = limit;

if (semaphore != null) {
final int target = Math.max(0, limit);
final int used = Math.max(0, acquireCount);
final int toSeed = Math.max(0, target - used);

semaphore.drainPermits();
if (toSeed > 0) {
semaphore.release(toSeed);
}
}
}

/**
Expand All @@ -446,8 +489,6 @@ public final synchronized void setLimit(final int limit) {
public synchronized void shutdown() {
if (!shutdown) {
if (ownExecutor) {
// if the executor was created by this instance, it has
// to be shutdown
getExecutorService().shutdownNow();
}
if (task != null) {
Expand Down Expand Up @@ -475,8 +516,17 @@ protected ScheduledFuture<?> startTimer() {
* @throws IllegalStateException if this semaphore is already shut down.
* @since 3.5
*/
public synchronized boolean tryAcquire() {
public boolean tryAcquire() {
prepareAcquire();
return acquirePermit();
if (getLimit() <= NO_LIMIT) {
synchronized (this) { acquireCount++; }
return true;
}

final boolean ok = semaphore.tryAcquire();
if (ok) {
synchronized (this) { acquireCount++; }
}
return ok;
}
}
Loading
Loading