Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace volatile with release/acquire where possible #4599

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public abstract class AbstractSailConnection implements SailConnection {
private final AbstractSail sailBase;

private volatile boolean txnActive;
private static final VarHandle TXN_ACTIVE;

private volatile boolean txnPrepared;

Expand All @@ -91,16 +92,6 @@ public abstract class AbstractSailConnection implements SailConnection {
private boolean isOpen = true;
private static final VarHandle IS_OPEN;

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

/**
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
* transaction.
Expand Down Expand Up @@ -138,17 +129,20 @@ public abstract class AbstractSailConnection implements SailConnection {

private IsolationLevel transactionIsolationLevel;

// used to decide if we need to call flush()
// Used to decide if we need to call flush(). Use the VarHandles below in relase/acquire mode instead.
private volatile boolean statementsAdded;
private volatile boolean statementsRemoved;

private static final VarHandle STATEMENTS_ADDED;
private static final VarHandle STATEMENTS_REMOVED;

/*--------------*
* Constructors *
*--------------*/

public AbstractSailConnection(AbstractSail sailBase) {
this.sailBase = sailBase;
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
if (debugEnabled) {
activeIterationsDebug = new ConcurrentHashMap<>();
} else {
Expand Down Expand Up @@ -177,7 +171,7 @@ protected void verifyIsOpen() throws SailException {
* @throws SailException if no transaction is active.
*/
protected void verifyIsActive() throws SailException {
if (!isActive()) {
if (!((boolean) TXN_ACTIVE.getAcquire(this))) {
throw new SailException("No active transaction");
}
}
Expand Down Expand Up @@ -215,7 +209,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
}

startTransactionInternal();
txnActive = true;
TXN_ACTIVE.setRelease(this, true);
} finally {
updateLock.unlock();
}
Expand Down Expand Up @@ -268,15 +262,15 @@ public final void close() throws SailException {
try {
forceCloseActiveOperations();

if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
logger.warn("Rolling back transaction due to connection close",
debugEnabled ? new Throwable() : null);
try {
// Use internal method to avoid deadlock: the public
// rollback method will try to obtain a connection lock
rollbackInternal();
} finally {
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
txnPrepared = false;
}
}
Expand Down Expand Up @@ -417,7 +411,7 @@ public final long size(Resource... contexts) throws SailException {
}

protected final boolean transactionActive() {
return txnActive;
return (boolean) TXN_ACTIVE.getAcquire(this);
}

/**
Expand Down Expand Up @@ -459,7 +453,7 @@ public final void prepare() throws SailException {

updateLock.lock();
try {
if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
prepareInternal();
txnPrepared = true;
}
Expand Down Expand Up @@ -489,12 +483,12 @@ public final void commit() throws SailException {

updateLock.lock();
try {
if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
if (!txnPrepared) {
prepareInternal();
}
commitInternal();
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
txnPrepared = false;
}
} finally {
Expand Down Expand Up @@ -525,11 +519,11 @@ public final void rollback() throws SailException {

updateLock.lock();
try {
if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
try {
rollbackInternal();
} finally {
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
txnPrepared = false;
}
} else {
Expand All @@ -553,7 +547,8 @@ public final void addStatement(Resource subj, IRI pred, Value obj, Resource... c
flushPendingUpdates();
}
addStatement(null, subj, pred, obj, contexts);
statementsAdded = true;

STATEMENTS_ADDED.setRelease(this, true);
}

@Override
Expand All @@ -562,7 +557,7 @@ public final void removeStatements(Resource subj, IRI pred, Value obj, Resource.
flushPendingUpdates();
}
removeStatement(null, subj, pred, obj, contexts);
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
}

@Override
Expand Down Expand Up @@ -604,7 +599,7 @@ public void addStatement(UpdateContext op, Resource subj, IRI pred, Value obj, R
startUpdate(op);
}
}
statementsAdded = true;
STATEMENTS_ADDED.setRelease(this, true);
}

/**
Expand Down Expand Up @@ -632,7 +627,7 @@ public void removeStatement(UpdateContext op, Resource subj, IRI pred, Value obj
startUpdate(op);
}
}
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
}

@Override
Expand Down Expand Up @@ -703,7 +698,7 @@ public final void clear(Resource... contexts) throws SailException {
try {
verifyIsActive();
clearInternal(contexts);
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
} finally {
updateLock.unlock();
}
Expand Down Expand Up @@ -836,19 +831,20 @@ public final void clearNamespaces() throws SailException {

@Override
public boolean pendingRemovals() {
return statementsRemoved;
return (boolean) STATEMENTS_REMOVED.getAcquire(this);

}

protected boolean pendingAdds() {
return statementsAdded;
return (boolean) STATEMENTS_ADDED.getAcquire(this);
}

protected void setStatementsAdded() {
statementsAdded = true;
STATEMENTS_ADDED.setRelease(this, true);
}

protected void setStatementsRemoved() {
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
}

@Deprecated(forRemoval = true)
Expand Down Expand Up @@ -992,9 +988,10 @@ private void forceCloseActiveOperations() throws SailException {
* @throws SailException
*/
private void flushPendingUpdates() throws SailException {
if ((statementsAdded || statementsRemoved) && isActive()) {
if ((pendingAdds() || pendingRemovals()) && isActive()) {
if (isActive()) {
synchronized (this) {
// we are inside a synchornized block so there isn't much point using the VarHandle
if ((statementsAdded || statementsRemoved) && isActive()) {
flush();
statementsAdded = false;
Expand Down Expand Up @@ -1128,4 +1125,44 @@ public synchronized void release() {
}
}
}

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

static {
try {
TXN_ACTIVE = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "txnActive", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

static {
try {
STATEMENTS_REMOVED = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "statementsRemoved", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

static {
try {
STATEMENTS_ADDED = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "statementsAdded", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class FileIO {

public static final int INF_QUAD_MARKER = 5;

public static final int URI_MARKER = 6;
public static final int IRI_MARKER = 6;

public static final int BNODE_MARKER = 7;

Expand Down Expand Up @@ -270,7 +270,7 @@ private void readStatement(boolean hasContext, boolean isExplicit, DataInputStre

private void writeValue(Value value, DataOutputStream dataOut) throws IOException {
if (value.isIRI()) {
dataOut.writeByte(URI_MARKER);
dataOut.writeByte(IRI_MARKER);
writeString(((IRI) value).stringValue(), dataOut);
} else if (value.isBNode()) {
dataOut.writeByte(BNODE_MARKER);
Expand Down Expand Up @@ -301,9 +301,9 @@ private void writeValue(Value value, DataOutputStream dataOut) throws IOExceptio
private Value readValue(DataInputStream dataIn) throws IOException, ClassCastException {
int valueTypeMarker = dataIn.readByte();

if (valueTypeMarker == URI_MARKER) {
String uriString = readString(dataIn);
return vf.createIRI(uriString);
if (valueTypeMarker == IRI_MARKER) {
String iriString = readString(dataIn);
return vf.createIRI(iriString);
} else if (valueTypeMarker == BNODE_MARKER) {
String bnodeID = readString(dataIn);
return vf.createBNode(bnodeID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private int minStatementCount(Value subj, Value pred, Value obj, Value context)
}

if (pred != null) {
MemIRI memPred = valueFactory.getMemURI((IRI) pred);
MemIRI memPred = valueFactory.getMemIRI((IRI) pred);
if (memPred != null) {
minListSizes = Math.min(minListSizes, memPred.getPredicateStatementCount());
if (minListSizes == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private CloseableIteration<MemStatement, SailException> createStatementIterator(
return EMPTY_ITERATION;
}

MemIRI memPred = valueFactory.getMemURI(pred);
MemIRI memPred = valueFactory.getMemIRI(pred);
if (pred != null && memPred == null) {
// non-existent predicate
return EMPTY_ITERATION;
Expand Down Expand Up @@ -345,7 +345,7 @@ private CloseableIteration<MemTriple, SailException> createTripleIterator(Resour
return EMPTY_TRIPLE_ITERATION;
}

MemIRI memPred = valueFactory.getMemURI(pred);
MemIRI memPred = valueFactory.getMemIRI(pred);
if (pred != null && memPred == null) {
// non-existent predicate
return EMPTY_TRIPLE_ITERATION;
Expand Down Expand Up @@ -592,7 +592,7 @@ public String toString() {
} else {
sb.append("inferred ");
}
if (txnLock) {
if ((boolean) TXN_LOCK.getAcquire(this)) {
sb.append("snapshot ").append(nextSnapshot);
} else {
sb.append(super.toString());
Expand Down Expand Up @@ -635,7 +635,7 @@ public synchronized void prepare() throws SailException {

@Override
public synchronized void flush() throws SailException {
if (txnLock) {
if ((boolean) TXN_LOCK.getAcquire(this)) {
invalidateCache();
currentSnapshot = Math.max(currentSnapshot, nextSnapshot);
if (requireCleanup) {
Expand All @@ -653,8 +653,8 @@ public void close() {
reservedSnapshot.release();
}
} finally {
boolean toCloseTxnLock = txnLock;
txnLock = false;
boolean toCloseTxnLock = (boolean) TXN_LOCK.getAcquire(this);
TXN_LOCK.setRelease(this, false);
if (toCloseTxnLock) {
txnLockManager.unlock();
}
Expand Down Expand Up @@ -792,7 +792,7 @@ private void innerDeprecate(Statement statement, int nextSnapshot) {
}

private void acquireExclusiveTransactionLock() throws SailException {
if (!txnLock) {
if (!(boolean) TXN_LOCK.getAcquire(this)) {
synchronized (this) {
if (!txnLock) {
txnLockManager.lock();
Expand All @@ -812,7 +812,7 @@ private MemStatement addStatement(Resource subj, IRI pred, Value obj, Resource c

// Get or create MemValues for the operands
MemResource memSubj = valueFactory.getOrCreateMemResource(subj);
MemIRI memPred = valueFactory.getOrCreateMemURI(pred);
MemIRI memPred = valueFactory.getOrCreateMemIRI(pred);
MemValue memObj = valueFactory.getOrCreateMemValue(obj);
MemResource memContext = context == null ? null : valueFactory.getOrCreateMemResource(context);

Expand Down Expand Up @@ -1118,7 +1118,7 @@ public ReservedSnapshot reserve(int snapshot, Object reservedBy) {
}
}

LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, (k) -> new LongAdder());
LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, k -> new LongAdder());
longAdder.increment();

return new ReservedSnapshot(snapshot, reservedBy, debug, longAdder, activeSnapshots,
Expand Down Expand Up @@ -1162,7 +1162,7 @@ public ReservedSnapshot(int snapshot, Object reservedBy, boolean debug,
this.frequency = frequency;
this.highestEverReservedSnapshot = highestEverReservedSnapshot;
cleanable = cleaner.register(reservedBy, () -> {
int tempSnapshot = ((int) SNAPSHOT.getVolatile(this));
int tempSnapshot = (int) SNAPSHOT.getVolatile(this);
if (tempSnapshot != SNAPSHOT_RELEASED) {
String message = "Releasing MemorySailStore snapshot {} which was reserved and never released (possibly unclosed MemorySailDataset or MemorySailSink).";
if (stackTraceForDebugging != null) {
Expand Down Expand Up @@ -1201,4 +1201,17 @@ public void release() {

}
}

private static final VarHandle TXN_LOCK;

static {
try {
TXN_LOCK = MethodHandles.lookup()
.in(MemorySailSink.class)
.findVarHandle(MemorySailSink.class, "txnLock", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

}
Loading