Skip to content

Commit

Permalink
RATIS-1933. Two improvements on ReferenceCountedObject.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Nov 8, 2023
1 parent ecc1725 commit 8d09f6c
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,27 @@
*/
package org.apache.ratis.util;

import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
* A reference-counted object can be retained for later use
* and then be released for returning the resource.
*
* <p>
* - When the object is retained, the reference count is incremented by 1.
*
* <p>
* - When the object is released, the reference count is decremented by 1.
*
* <p>
* - If the object is retained, it must be released afterward.
* Otherwise, the object will not be returned, and it will cause a resource leak.
*
* <p>
* - If the object is retained multiple times,
* it must be released the same number of times.
*
* <p>
* - If the object has been retained and then completely released (i.e. the reference count becomes 0),
* it must not be retained/released/accessed anymore since it may have been allocated for other use.
*
Expand All @@ -46,14 +50,44 @@ public interface ReferenceCountedObject<T> {
/**
* Retain the object for later use.
* The reference count will be increased by 1.
*
* <p>
* The {@link #release()} method must be invoked afterward.
* Otherwise, the object is not returned, and it will cause a resource leak.
*
* @return the object.
*/
T retain();

/**
* The same as {@link #retain()} except that this method returns a {@link UncheckedAutoCloseableSupplier}.
*
* @return a {@link UncheckedAutoCloseableSupplier}
* where {@link java.util.function.Supplier#get()} will return the retained object,
* i.e. the object returned by {@link #retain()},
* and calling {@link UncheckedAutoCloseable#close()} one or more times
* is the same as calling {@link #release()} once (idempotent).
*/
default UncheckedAutoCloseableSupplier<T> retainAndReleaseOnClose() {
final T retained = retain();
final AtomicBoolean closed = new AtomicBoolean();
return new UncheckedAutoCloseableSupplier<T>() {
@Override
public T get() {
if (closed.get()) {
throw new IllegalStateException("Already closed");
}
return retained;
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
release();
}
}
};
}

/**
* Release the object.
* The reference count will be decreased by 1.
Expand All @@ -64,19 +98,20 @@ public interface ReferenceCountedObject<T> {

/** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
static <V> ReferenceCountedObject<V> wrap(V value) {
return wrap(value, () -> {}, () -> {});
return wrap(value, () -> {}, ignored -> {});
}

/**
* Wrap the given value as a {@link ReferenceCountedObject}.
*
* @param value the value being wrapped.
* @param retainMethod a method to run when {@link #retain()} is invoked.
* @param releaseMethod a method to run when {@link #release()} is invoked.
* @param releaseMethod a method to run when {@link #release()} is invoked,
* where the method takes a boolean which is the same as the one returned by {@link #release()}.
* @param <V> The value type.
* @return the wrapped reference-counted object.
*/
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
Objects.requireNonNull(value, "value == null");
Objects.requireNonNull(retainMethod, "retainMethod == null");
Objects.requireNonNull(releaseMethod, "releaseMethod == null");
Expand Down Expand Up @@ -118,9 +153,15 @@ public boolean release() {
} else if (previous == 0) {
throw new IllegalStateException("Failed to release: object has not yet been retained.");
}
releaseMethod.run();
return previous == 1;
final boolean completedReleased = previous == 1;
releaseMethod.accept(completedReleased);
return completedReleased;
}
};
}

/** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
return wrap(value, retainMethod, ignored -> releaseMethod.run());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util.function;

import org.apache.ratis.util.UncheckedAutoCloseable;

import java.util.function.Supplier;

/**
* A {@link Supplier} which is also {@link UncheckedAutoCloseable}.
*
* @param <T> the type of the {@link Supplier}.
*/
public interface UncheckedAutoCloseableSupplier<T> extends UncheckedAutoCloseable, Supplier<T> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -327,14 +328,12 @@ static long writeTo(ByteBuf buf, Iterable<WriteOption> options,
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
wrapped.retain();
try {
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(
buffer, buf::retain, ignored -> buf.release());
try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
byteWritten += channel.write(wrapped);
} catch (Throwable t) {
throw new CompletionException(t);
} finally {
wrapped.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;

import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -58,17 +59,31 @@ public void testWrap() {
Assert.assertEquals(value, ref.retain());
assertValues(retained, 1, released, 0);

Assert.assertEquals(value, ref.retain());
assertValues(retained, 2, released, 0);

assertRelease(ref, retained, 2, released, 1);
try(UncheckedAutoCloseableSupplier<String> auto = ref.retainAndReleaseOnClose()) {
final String got = auto.get();
Assert.assertEquals(value, got);
Assert.assertSame(got, auto.get()); // it should return the same object.
assertValues(retained, 2, released, 0);
} catch (IllegalStateException e) {
e.printStackTrace(System.out);
}
assertValues(retained, 2, released, 1);

Assert.assertEquals(value, ref.retain());
final UncheckedAutoCloseableSupplier<String> notClosing = ref.retainAndReleaseOnClose();
Assert.assertEquals(value, notClosing.get());
assertValues(retained, 3, released, 1);

assertRelease(ref, retained, 3, released, 2);

assertRelease(ref, retained, 3, released, 3);
final UncheckedAutoCloseableSupplier<String> auto = ref.retainAndReleaseOnClose();
Assert.assertEquals(value, auto.get());
assertValues(retained, 4, released, 2);
auto.close();
assertValues(retained, 4, released, 3);
auto.close(); // close() is idempotent.
assertValues(retained, 4, released, 3);

// completely released
assertRelease(ref, retained, 4, released, 4);

try {
ref.get();
Expand All @@ -84,6 +99,12 @@ public void testWrap() {
e.printStackTrace(System.out);
}

try(UncheckedAutoCloseable ignore = ref.retainAndReleaseOnClose()) {
Assert.fail();
} catch (IllegalStateException e) {
e.printStackTrace(System.out);
}

try {
ref.release();
Assert.fail();
Expand All @@ -94,7 +115,7 @@ public void testWrap() {

@Test(timeout = 1000)
public void testReleaseWithoutRetaining() {
final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("", () -> {}, () -> {});
final ReferenceCountedObject<String> ref = ReferenceCountedObject.wrap("");

try {
ref.release();
Expand All @@ -116,5 +137,11 @@ public void testReleaseWithoutRetaining() {
} catch (IllegalStateException e) {
e.printStackTrace(System.out);
}

try(UncheckedAutoCloseable ignore = ref.retainAndReleaseOnClose()) {
Assert.fail();
} catch (IllegalStateException e) {
e.printStackTrace(System.out);
}
}
}

0 comments on commit 8d09f6c

Please sign in to comment.