Skip to content

Commit

Permalink
addressing the review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmandeep Singh committed Jul 23, 2020
1 parent 7ed5f01 commit 941fb29
Showing 1 changed file with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,80 +55,77 @@ public ObjectPoolPartition(ObjectPool<T> pool, PoolConfig config,
this.totalCount = 0;
this.log = new CustomLogger(name, host);
this.takeSemaphore = new Semaphore(config.getMaxSize(), true);
for (int i = 0; i < config.getMinSize(); i++) {
try {
T object = objectFactory.create(host, socketTimeout, connectTimeout);
objectQueue.add(new Poolable<>(object, pool, host));
totalCount++;
}
catch (Exception e) {
log.warn("Error in initializing pool, error: " + e);
try {
for (int i = 0; i < config.getMinSize(); i++) {
T object = objectFactory.create(host, socketTimeout, connectTimeout);
objectQueue.add(new Poolable<>(object, pool, host));
totalCount++;
}
}
catch (Exception e) {
// skipping logging the exception as factories are already logging.
}
}

public void returnObject(Poolable<T> object)
{
takeSemaphore.release();
if (!objectFactory.validate(object.getObject())) {
log.debug(String.format("Invalid object...removing: %s ", object));
decreaseObject(object);
return;
}
try {
if (!objectFactory.validate(object.getObject())) {
log.debug(String.format("Invalid object...removing: %s ", object));
decreaseObject(object);
return;
}

log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
if (!objectQueue.offer(object)) {
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
decreaseObject(object);
log.debug(String.format("Returning object: %s to queue. Queue size: %d", object, objectQueue.size()));
if (!objectQueue.offer(object)) {
log.warn("Created more objects than configured. Created=" + totalCount + " QueueSize=" + objectQueue.size());
decreaseObject(object);
}
}
finally {
takeSemaphore.release();
}
}

public Poolable<T> getObject(boolean blocking)
{
try {
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("Cannot get a free object from the pool");
}

Poolable<T> object;
if (blocking) {
try {
try {
if (blocking) {
takeSemaphore.acquire();
object = objectQueue.take();
}
catch (InterruptedException e) {
throw new RuntimeException(e); // will never happen
else {
takeSemaphore.tryAcquire(config.getMaxWaitMilliseconds(), TimeUnit.MILLISECONDS);
object = tryGetObject();
}
object.setLastAccessTs(System.currentTimeMillis());
}
else
{
object = tryGetObject(objectQueue.poll());
catch (Exception e) {
takeSemaphore.release();
throw new RuntimeException("Cannot get a free object from the pool", e);
}
object.setLastAccessTs(System.currentTimeMillis());
return object;
}

private Poolable<T> tryGetObject(Poolable<T> poolable)
private Poolable<T> tryGetObject() throws Exception
{
Poolable<T> poolable = objectQueue.poll();
if (poolable == null)
{
try {
T object = objectFactory.create(host, socketTimeout, connectTimeout);
poolable = new Poolable<>(object, pool, host);
totalCount++;
log.debug(String.format("Increased pool size by %d, to new size: %d, current queue size: %d, delta: %d",
1, totalCount, objectQueue.size(), 1));
log.debug(String.format("Added a connection, new totalCount: %d, queueSize: %d", totalCount, objectQueue.size()));
}
catch (Exception e) {
log.warn(String.format("Unable to increase pool size. Pool state: totalCount=%d queueSize=%d delta=%d", totalCount, objectQueue.size(), 1), e);
// objectToReturn is not on the queue hence untracked, clean it up before forwarding exception
log.warn(String.format("Unable create a connection. Pool state: totalCount=%d queueSize=%d", totalCount, objectQueue.size()), e);
if (poolable != null) {
objectFactory.destroy(poolable.getObject());
poolable.destroy();
}
throw new RuntimeException(e);
throw e;
}
}
return poolable;
Expand Down

0 comments on commit 941fb29

Please sign in to comment.